threadshare: introduce TaskImpl trait

TaskImpl is the trait for specific Task behaviour. It is the basis
of a new Task model. The main motivation for this model is to ease
threadsafe implementations of state transitions.

See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/298
This commit is contained in:
François Laignel 2020-04-20 21:35:06 +02:00
parent b3138ad041
commit 1bea2ad279
16 changed files with 3437 additions and 2138 deletions

View file

@ -21,7 +21,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
pin-project = "0.4"
tokio = { git = "https://github.com/fengalin/tokio", tag = "tokio-0.2.12-throttling", features = ["io-util", "macros", "rt-core", "sync", "stream", "time", "tcp", "udp", "rt-util"] }
tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/throttling", features = ["io-util", "macros", "rt-core", "sync", "stream", "time", "tcp", "udp", "rt-util"] }
futures = "0.3"
lazy_static = "1.0"
rand = "0.7"

View file

@ -17,6 +17,7 @@
// Boston, MA 02110-1335, USA.
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
@ -37,7 +38,7 @@ use std::sync::Mutex as StdMutex;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, Task};
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
@ -167,16 +168,12 @@ impl AppSrcPadHandler {
.caps = caps;
}
fn reset_state(&self) {
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
async fn reset_state(&self) {
*self.0.state.lock().await = Default::default();
}
fn set_need_segment(&self) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.need_segment = true;
async fn set_need_segment(&self) {
self.0.state.lock().await.need_segment = true;
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
@ -209,7 +206,7 @@ impl AppSrcPadHandler {
}
async fn push_item(
self,
&self,
pad: &PadSrcRef<'_>,
element: &gst::Element,
item: StreamItem,
@ -224,9 +221,17 @@ impl AppSrcPadHandler {
pad.push(buffer).await
}
StreamItem::Event(event) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
match event.view() {
gst::EventView::Eos(_) => {
// Let the caller push the event
Err(gst::FlowError::Eos)
}
_ => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
}
}
}
}
}
@ -239,7 +244,7 @@ impl PadSrcHandler for AppSrcPadHandler {
&self,
pad: &PadSrcRef,
appsrc: &AppSrc,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -248,11 +253,11 @@ impl PadSrcHandler for AppSrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
appsrc.flush_start(element);
appsrc.task.flush_start();
true
}
EventView::FlushStop(..) => {
appsrc.flush_stop(element);
appsrc.task.flush_stop();
true
}
EventView::Reconfigure(..) => true,
@ -316,11 +321,112 @@ impl PadSrcHandler for AppSrcPadHandler {
}
}
#[derive(Debug, Eq, PartialEq)]
enum AppSrcState {
Paused,
RejectBuffers,
Started,
#[derive(Debug)]
struct AppSrcTask {
element: gst::Element,
src_pad: PadSrcWeak,
src_pad_handler: AppSrcPadHandler,
receiver: mpsc::Receiver<StreamItem>,
}
impl AppSrcTask {
fn new(
element: &gst::Element,
src_pad: &PadSrc,
src_pad_handler: &AppSrcPadHandler,
receiver: mpsc::Receiver<StreamItem>,
) -> Self {
AppSrcTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
src_pad_handler: src_pad_handler.clone(),
receiver,
}
}
}
impl AppSrcTask {
fn flush(&mut self) {
// Purge the channel
while let Ok(Some(_item)) = self.receiver.try_next() {}
}
}
impl TaskImpl for AppSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.receiver.next().await;
let item = match item {
Some(item) => item,
None => {
gst_error!(CAT, obj: &self.element, "SrcPad channel aborted");
gst_element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason: channel aborted"]
);
return Err(gst::FlowError::Flushing);
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let res = self
.src_pad_handler
.push_item(&pad, &self.element, item)
.await;
match res {
Ok(_) => {
gst_log!(CAT, obj: &self.element, "Successfully pushed item");
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &self.element, "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &self.element, "Flushing");
}
Err(err) => {
gst_error!(CAT, obj: &self.element, "Got error {}", err);
gst_element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
res.map(drop)
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.flush();
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task flush");
self.flush();
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Task flush started");
}
.boxed()
}
}
#[derive(Debug)]
@ -328,17 +434,15 @@ struct AppSrc {
src_pad: PadSrc,
src_pad_handler: AppSrcPadHandler,
task: Task,
state: StdMutex<AppSrcState>,
sender: StdMutex<Option<mpsc::Sender<StreamItem>>>,
receiver: StdMutex<Option<Arc<FutMutex<mpsc::Receiver<StreamItem>>>>>,
settings: StdMutex<Settings>,
}
impl AppSrc {
fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool {
let state = self.state.lock().unwrap();
if *state == AppSrcState::RejectBuffers {
gst_debug!(CAT, obj: element, "Rejecting buffer due to pad state");
let state = self.task.lock_state();
if *state != TaskState::Started && *state != TaskState::Paused {
gst_debug!(CAT, obj: element, "Rejecting buffer due to element state");
return false;
}
@ -411,188 +515,51 @@ impl AppSrc {
let (sender, receiver) = mpsc::channel(max_buffers);
*self.sender.lock().unwrap() = Some(sender);
*self.receiver.lock().unwrap() = Some(Arc::new(FutMutex::new(receiver)));
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad_handler.prepare(settings.caps.clone());
self.task
.prepare(
AppSrcTask::new(element, &self.src_pad, &self.src_pad_handler, receiver),
context,
)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
*self.sender.lock().unwrap() = None;
*self.receiver.lock().unwrap() = None;
self.task.unprepare().unwrap();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
self.flush(element);
self.src_pad_handler.reset_state();
*state = AppSrcState::RejectBuffers;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn flush(&self, element: &gst::Element) {
gst_log!(CAT, obj: element, "Flushing");
self.task.stop();
let receiver = self.receiver.lock().unwrap();
let mut receiver = receiver
.as_ref()
.unwrap()
.try_lock()
.expect("receiver locked elsewhere");
// Purge the channel
loop {
match receiver.try_next() {
Ok(Some(_item)) => {
gst_log!(CAT, obj: element, "Dropping pending item");
}
Err(_) => {
gst_log!(CAT, obj: element, "No more pending item");
break;
}
Ok(None) => {
panic!("Channel sender dropped");
}
}
}
self.src_pad_handler.set_need_segment();
gst_log!(CAT, obj: element, "Flushed");
gst_debug!(CAT, obj: element, "Stopped");
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
if *state == AppSrcState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Ok(());
}
fn start(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Starting");
self.start_task(element);
*state = AppSrcState::Started;
self.task.start();
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self, element: &gst::Element) {
let src_pad_handler = self.src_pad_handler.clone();
let pad_weak = self.src_pad.downgrade();
let element = element.clone();
let receiver = Arc::clone(self.receiver.lock().unwrap().as_ref().expect("No receiver"));
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let receiver = Arc::clone(&receiver);
async move {
let item = receiver.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SrcPad channel aborted");
return glib::Continue(false);
}
};
match src_pad_handler.push_item(&pad, &element, item).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
glib::Continue(true)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
&element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
if *state == AppSrcState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
}
gst_debug!(CAT, obj: element, "Stopping Flush");
self.flush(element);
self.start_task(element);
*state = AppSrcState::Started;
gst_debug!(CAT, obj: element, "Flush Stopped");
}
fn flush_start(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Starting Flush");
self.task.cancel();
*state = AppSrcState::RejectBuffers;
gst_debug!(CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
fn pause(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause();
*state = AppSrcState::Paused;
gst_debug!(CAT, obj: element, "Paused");
Ok(())
}
}
@ -671,9 +638,7 @@ impl ObjectSubclass for AppSrc {
),
src_pad_handler,
task: Task::default(),
state: StdMutex::new(AppSrcState::RejectBuffers),
sender: StdMutex::new(None),
receiver: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}
@ -749,10 +714,10 @@ impl ElementImpl for AppSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element).map_err(|_| gst::StateChangeError)?;
self.pause(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -764,13 +729,13 @@ impl ElementImpl for AppSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
_ => (),
}

View file

@ -67,7 +67,6 @@ impl DataQueueItem {
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DataQueueState {
Paused,
Started,
Stopped,
}
@ -137,18 +136,6 @@ impl DataQueue {
inner.wake();
}
pub fn pause(&self) {
let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Paused {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Paused");
return;
}
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Pausing data queue");
assert_eq!(DataQueueState::Started, inner.state);
inner.state = DataQueueState::Paused;
inner.wake();
}
pub fn stop(&self) {
let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Stopped {
@ -163,7 +150,6 @@ impl DataQueue {
pub fn clear(&self) {
let mut inner = self.0.lock().unwrap();
assert_eq!(inner.state, DataQueueState::Paused);
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Clearing data queue");
let src_pad = inner.src_pad.clone();
@ -259,10 +245,6 @@ impl DataQueue {
return Some(item);
}
},
DataQueueState::Paused => {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue Paused");
return None;
}
DataQueueState::Stopped => {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue Stopped");
return None;

View file

@ -662,7 +662,7 @@ impl PadSinkHandler for SinkHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
if let EventView::FlushStart(..) = event.view() {
jb.task.cancel();
jb.task.flush_start();
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
@ -699,7 +699,7 @@ impl PadSinkHandler for SinkHandler {
.unwrap();
}
EventView::FlushStop(..) => {
jb.flush_stop(&element);
jb.task.flush_stop();
}
EventView::Eos(..) => {
let mut state = jb.state.lock().unwrap();
@ -966,7 +966,7 @@ impl PadSrcHandler for SrcHandler {
&self,
pad: &PadSrcRef,
jb: &JitterBuffer,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -975,10 +975,10 @@ impl PadSrcHandler for SrcHandler {
match event.view() {
EventView::FlushStart(..) => {
jb.task.cancel();
jb.task.flush_start();
}
EventView::FlushStop(..) => {
jb.flush_stop(element);
jb.task.flush_stop();
}
_ => (),
}
@ -1104,6 +1104,203 @@ impl Default for State {
}
}
struct JitterBufferTask {
element: gst::Element,
src_pad_handler: SrcHandler,
sink_pad_handler: SinkHandler,
}
impl JitterBufferTask {
fn new(
element: &gst::Element,
src_pad_handler: &SrcHandler,
sink_pad_handler: &SinkHandler,
) -> Self {
JitterBufferTask {
element: element.clone(),
src_pad_handler: src_pad_handler.clone(),
sink_pad_handler: sink_pad_handler.clone(),
}
}
}
impl TaskImpl for JitterBufferTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
let jb = JitterBuffer::from_instance(&self.element);
*jb.state.lock().unwrap() = State::default();
gst_log!(CAT, obj: &self.element, "Task started");
}
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let jb = JitterBuffer::from_instance(&self.element);
let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap();
(
settings.latency_ms as u64 * gst::MSECOND,
settings.context_wait as u64 * gst::MSECOND,
)
};
loop {
let delay_fut = {
let mut state = jb.state.lock().unwrap();
let (_, next_wakeup) = self.src_pad_handler.get_next_wakeup(
&self.element,
&state,
latency,
context_wait,
);
let (delay_fut, abort_handle) = match next_wakeup {
Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
Some((_, delay)) => {
runtime::time::delay_for(delay).await;
}
None => {
future::pending::<()>().await;
}
};
});
let next_wakeup =
next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
state.wait_handle = abort_handle;
delay_fut
};
// Got aborted, reschedule if needed
if let Some(delay_fut) = delay_fut {
gst_debug!(CAT, obj: &self.element, "Waiting");
if let Err(Aborted) = delay_fut.await {
gst_debug!(CAT, obj: &self.element, "Waiting aborted");
return Ok(());
}
}
let (head_pts, head_seq) = {
let state = jb.state.lock().unwrap();
//
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
&self.element,
&state,
latency,
context_wait,
);
gst_debug!(
CAT,
obj: &self.element,
"Woke up at {}, earliest_pts {}",
now,
state.earliest_pts
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
let (head_pts, head_seq) = state.jbuf.borrow().peek();
(head_pts, head_seq)
};
let res = self.src_pad_handler.pop_and_push(&self.element).await;
{
let mut state = jb.state.lock().unwrap();
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
let (earliest_pts, earliest_seqnum) = state.jbuf.borrow().find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
}
if res.is_ok() {
// Return and reschedule if the next packet would be in the future
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = self.src_pad_handler.get_next_wakeup(
&self.element,
&state,
latency,
context_wait,
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
}
}
if let Err(err) = res {
match err {
gst::FlowError::Eos => {
gst_debug!(CAT, obj: &self.element, "Pushing EOS event");
let event = gst::Event::new_eos().build();
let _ = jb.src_pad.push_event(event).await;
}
gst::FlowError::Flushing => gst_debug!(CAT, obj: &self.element, "Flushing"),
err => gst_error!(CAT, obj: &self.element, "Error {}", err),
}
return Err(err);
}
}
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
let jb = JitterBuffer::from_instance(&self.element);
let mut jb_state = jb.state.lock().unwrap();
if let Some((_, abort_handle)) = jb_state.wait_handle.take() {
abort_handle.abort();
}
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
*jb_state = State::default();
gst_log!(CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
}
struct JitterBuffer {
sink_pad: PadSink,
src_pad: PadSrc,
@ -1139,12 +1336,17 @@ impl JitterBuffer {
Context::acquire(&settings.context, settings.context_wait).unwrap()
};
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
self.task
.prepare(
JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
context,
)
})?;
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_info!(CAT, obj: element, "Prepared");
@ -1159,187 +1361,15 @@ impl JitterBuffer {
fn start(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Starting");
*self.state.lock().unwrap() = State::default();
self.sink_pad_handler.clear();
self.src_pad_handler.clear();
self.start_task(element);
self.task.start();
gst_debug!(CAT, obj: element, "Started");
}
fn start_task(&self, element: &gst::Element) {
let src_pad_handler = self.src_pad_handler.clone();
let element = element.clone();
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let element = element.clone();
async move {
let jb = JitterBuffer::from_instance(&element);
let (latency, context_wait) = {
let settings = jb.settings.lock().unwrap();
(
settings.latency_ms as u64 * gst::MSECOND,
settings.context_wait as u64 * gst::MSECOND,
)
};
loop {
let delay_fut = {
let mut state = jb.state.lock().unwrap();
let (_, next_wakeup) = src_pad_handler.get_next_wakeup(
&element,
&state,
latency,
context_wait,
);
let (delay_fut, abort_handle) = match next_wakeup {
Some((_, delay)) if delay == Duration::from_nanos(0) => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
Some((_, delay)) => {
runtime::time::delay_for(delay).await;
}
None => {
future::pending::<()>().await;
}
};
});
let next_wakeup =
next_wakeup.map(|w| w.0).unwrap_or(gst::CLOCK_TIME_NONE);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
state.wait_handle = abort_handle;
delay_fut
};
// Got aborted, reschedule if needed
if let Some(delay_fut) = delay_fut {
gst_debug!(CAT, obj: &element, "Waiting");
if let Err(Aborted) = delay_fut.await {
gst_debug!(CAT, obj: &element, "Waiting aborted");
return glib::Continue(true);
}
}
let (head_pts, head_seq) = {
let state = jb.state.lock().unwrap();
//
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = src_pad_handler.get_next_wakeup(
&element,
&state,
latency,
context_wait,
);
gst_debug!(
CAT,
obj: &element,
"Woke up at {}, earliest_pts {}",
now,
state.earliest_pts
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return glib::Continue(true);
}
} else {
return glib::Continue(true);
}
let (head_pts, head_seq) = state.jbuf.borrow().peek();
(head_pts, head_seq)
};
let res = src_pad_handler.pop_and_push(&element).await;
{
let mut state = jb.state.lock().unwrap();
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
let (earliest_pts, earliest_seqnum) =
state.jbuf.borrow().find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
}
if res.is_ok() {
// Return and reschedule if the next packet would be in the future
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = src_pad_handler.get_next_wakeup(
&element,
&state,
latency,
context_wait,
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup > now {
// Reschedule and wait a bit longer in the next iteration
return glib::Continue(true);
}
} else {
return glib::Continue(true);
}
}
}
match res {
Ok(_) => (),
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &element, "Pushing EOS event",);
let event = gst::Event::new_eos().build();
let _ = jb.src_pad.push_event(event).await;
return glib::Continue(false);
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &element, "Flushing",);
return glib::Continue(false);
}
Err(err) => {
gst_error!(CAT, obj: &element, "Error {}", err,);
return glib::Continue(false);
}
}
}
}
});
}
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
if let Some((_, abort_handle)) = self.state.lock().unwrap().wait_handle.take() {
abort_handle.abort();
}
self.task.stop();
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
*self.state.lock().unwrap() = State::default();
gst_debug!(CAT, obj: element, "Stopped");
}
fn flush_stop(&self, element: &gst::Element) {
self.task.stop();
self.start(element);
}
}
impl ObjectSubclass for JitterBuffer {

View file

@ -41,7 +41,7 @@ use crate::runtime::{
Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task,
};
use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState};
use super::dataqueue::{DataQueue, DataQueueItem};
lazy_static! {
static ref PROXY_CONTEXTS: StdMutex<HashMap<String, Weak<StdMutex<ProxyContextInner>>>> =
@ -357,7 +357,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
};
if let EventView::FlushStart(..) = event.view() {
proxysink.stop(&element).unwrap();
proxysink.stop(&element);
}
if let Some(src_pad) = src_pad {
@ -391,7 +391,7 @@ impl PadSinkHandler for ProxySinkPadHandler {
let _ =
element.post_message(&gst::Message::new_eos().src(Some(&element)).build());
}
EventView::FlushStop(..) => proxysink.start(&element).unwrap(),
EventView::FlushStop(..) => proxysink.start(&element),
_ => (),
}
@ -421,69 +421,56 @@ lazy_static! {
}
impl ProxySink {
fn schedule_pending_queue(
&self,
element: &gst::Element,
pending_queue: &mut PendingQueue,
) -> impl Future<Output = ()> {
gst_log!(SINK_CAT, obj: element, "Scheduling pending queue now");
async fn schedule_pending_queue(&self, element: &gst::Element) {
loop {
let more_queue_space_receiver = {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
pending_queue.scheduled = true;
gst_log!(SINK_CAT, obj: element, "Trying to empty pending queue");
let element = element.clone();
async move {
let sink = Self::from_instance(&element);
let ProxyContextInner {
pending_queue: ref mut pq,
ref dataqueue,
..
} = *shared_ctx;
loop {
let more_queue_space_receiver = {
let proxy_ctx = sink.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue");
let ProxyContextInner {
pending_queue: ref mut pq,
ref dataqueue,
..
} = *shared_ctx;
if let Some(ref mut pending_queue) = *pq {
if let Some(ref dataqueue) = dataqueue {
let mut failed_item = None;
while let Some(item) = pending_queue.items.pop_front() {
if let Err(item) = dataqueue.push(item) {
failed_item = Some(item);
break;
}
if let Some(ref mut pending_queue) = *pq {
if let Some(ref dataqueue) = dataqueue {
let mut failed_item = None;
while let Some(item) = pending_queue.items.pop_front() {
if let Err(item) = dataqueue.push(item) {
failed_item = Some(item);
break;
}
}
if let Some(failed_item) = failed_item {
pending_queue.items.push_front(failed_item);
let (sender, receiver) = oneshot::channel();
pending_queue.more_queue_space_sender = Some(sender);
receiver
} else {
gst_log!(SINK_CAT, obj: &element, "Pending queue is empty now");
*pq = None;
return;
}
} else {
if let Some(failed_item) = failed_item {
pending_queue.items.push_front(failed_item);
let (sender, receiver) = oneshot::channel();
pending_queue.more_queue_space_sender = Some(sender);
receiver
} else {
gst_log!(SINK_CAT, obj: element, "Pending queue is empty now");
*pq = None;
return;
}
} else {
gst_log!(SINK_CAT, obj: &element, "Flushing, dropping pending queue");
*pq = None;
return;
}
};
let (sender, receiver) = oneshot::channel();
pending_queue.more_queue_space_sender = Some(sender);
gst_log!(SINK_CAT, obj: &element, "Waiting for more queue space");
let _ = more_queue_space_receiver.await;
}
receiver
}
} else {
gst_log!(SINK_CAT, obj: element, "Flushing, dropping pending queue");
*pq = None;
return;
}
};
gst_log!(SINK_CAT, obj: element, "Waiting for more queue space");
let _ = more_queue_space_receiver.await;
}
}
@ -563,7 +550,10 @@ impl ProxySink {
);
if schedule_now {
let wait_fut = self.schedule_pending_queue(element, pending_queue);
gst_log!(SINK_CAT, obj: element, "Scheduling pending queue now");
pending_queue.scheduled = true;
let wait_fut = self.schedule_pending_queue(element);
Some(wait_fut)
} else {
gst_log!(SINK_CAT, obj: element, "Scheduling pending queue later");
@ -624,15 +614,13 @@ impl ProxySink {
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(SINK_CAT, obj: element, "Unpreparing");
*self.proxy_ctx.lock().unwrap() = None;
gst_debug!(SINK_CAT, obj: element, "Unprepared");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
@ -647,11 +635,9 @@ impl ProxySink {
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
gst_debug!(SINK_CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) {
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
@ -661,8 +647,6 @@ impl ProxySink {
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst_debug!(SINK_CAT, obj: element, "Stopped");
Ok(())
}
}
@ -762,10 +746,10 @@ impl ElementImpl for ProxySink {
})?;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -773,7 +757,7 @@ impl ElementImpl for ProxySink {
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
Ok(success)
@ -792,7 +776,7 @@ impl ProxySrcPadHandler {
{
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
if let Some(ref mut pending_queue) = shared_ctx.pending_queue {
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
}
@ -822,7 +806,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
&self,
pad: &PadSrcRef,
proxysrc: &ProxySrc,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -841,8 +825,8 @@ impl PadSrcHandler for ProxySrcPadHandler {
};
match event.view() {
EventView::FlushStart(..) => proxysrc.stop(element).unwrap(),
EventView::FlushStop(..) => proxysrc.flush_stop(element),
EventView::FlushStart(..) => proxysrc.task.flush_start(),
EventView::FlushStop(..) => proxysrc.task.flush_stop(),
_ => (),
}
@ -903,6 +887,138 @@ impl PadSrcHandler for ProxySrcPadHandler {
}
}
#[derive(Debug)]
struct ProxySrcTask {
element: gst::Element,
src_pad: PadSrcWeak,
dataqueue: DataQueue,
}
impl ProxySrcTask {
fn new(element: &gst::Element, src_pad: &PadSrc, dataqueue: DataQueue) -> Self {
ProxySrcTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
dataqueue,
}
}
}
impl TaskImpl for ProxySrcTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task");
let proxysrc = ProxySrc::from_instance(&self.element);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
self.dataqueue.start();
gst_log!(SRC_CAT, obj: &self.element, "Task started");
}
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.dataqueue.next().await;
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: &self.element, "DataQueue Stopped");
return Err(gst::FlowError::Flushing);
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let proxysrc = ProxySrc::from_instance(&self.element);
let res = ProxySrcPadHandler::push_item(&pad, &proxysrc, item).await;
match res {
Ok(()) => {
gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
}
Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: &self.element, "Flushing");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
}
Err(gst::FlowError::Eos) => {
gst_debug!(SRC_CAT, obj: &self.element, "EOS");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Eos);
}
Err(err) => {
gst_error!(SRC_CAT, obj: &self.element, "Got error {}", err);
gst_element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(err);
}
}
res
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
let proxysrc = ProxySrc::from_instance(&self.element);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
self.dataqueue.stop();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
if let Some(mut pending_queue) = shared_ctx.pending_queue.take() {
pending_queue.notify_more_queue_space();
}
gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
let proxysrc = ProxySrc::from_instance(&self.element);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
}
.boxed()
}
}
#[derive(Debug)]
struct ProxySrc {
src_pad: PadSrc,
@ -971,21 +1087,23 @@ impl ProxySrc {
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
*self.dataqueue.lock().unwrap() = Some(dataqueue);
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
self.task.prepare(ts_ctx).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.task
.prepare(ProxySrcTask::new(element, &self.src_pad, dataqueue), ts_ctx)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(SRC_CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Unpreparing");
{
@ -1000,154 +1118,24 @@ impl ProxySrc {
*self.proxy_ctx.lock().unwrap() = None;
gst_debug!(SRC_CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
// Keep the lock on the `dataqueue` until `stop` is complete
// so as to prevent race conditions due to concurrent FlushStart/Stop.
// Note that this won't deadlock as `ProxySrc::dataqueue` guards a pointer to
// the `dataqueue` used within the `src_pad`'s `Task`.
let dataqueue = self.dataqueue.lock().unwrap();
fn stop(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Stopping");
self.task.stop();
let dataqueue = dataqueue.as_ref().unwrap();
dataqueue.clear();
dataqueue.stop();
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
if dataqueue.state() == DataQueueState::Started {
gst_debug!(SRC_CAT, obj: element, "Already started");
return Ok(());
}
fn start(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Starting");
self.start_unchecked(element, dataqueue);
self.task.start();
gst_debug!(SRC_CAT, obj: element, "Started");
Ok(())
}
fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) {
dataqueue.start();
{
let proxy_ctx = self.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
}
self.start_task(element, dataqueue);
}
fn start_task(&self, element: &gst::Element, dataqueue: &DataQueue) {
let pad_weak = self.src_pad.downgrade();
let dataqueue = dataqueue.clone();
let element = element.clone();
self.task.start(move || {
let pad_weak = pad_weak.clone();
let mut dataqueue = dataqueue.clone();
let element = element.clone();
async move {
let item = dataqueue.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused");
return glib::Continue(false);
}
};
let proxysrc = ProxySrc::from_instance(&element);
match ProxySrcPadHandler::push_item(&pad, &proxysrc, item).await {
Ok(_) => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Eos);
glib::Continue(false)
}
Err(err) => {
gst_error!(SRC_CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(err);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `dataqueue` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
// Note that this won't deadlock as `ProxySrc::dataqueue` guards a pointer to
// the `dataqueue` used within the `src_pad`'s `Task`.
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
if dataqueue.state() == DataQueueState::Started {
gst_debug!(SRC_CAT, obj: element, "Already started");
return;
}
gst_debug!(SRC_CAT, obj: element, "Stopping Flush");
self.task.stop();
self.start_unchecked(element, dataqueue);
gst_debug!(SRC_CAT, obj: element, "Stopped Flush");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
fn pause(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Pausing");
dataqueue.as_ref().unwrap().pause();
self.task.pause();
gst_debug!(SRC_CAT, obj: element, "Paused");
Ok(())
}
}
@ -1276,10 +1264,10 @@ impl ElementImpl for ProxySrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element).map_err(|_| gst::StateChangeError)?;
self.pause(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -1291,13 +1279,13 @@ impl ElementImpl for ProxySrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
_ => (),
}

View file

@ -35,9 +35,9 @@ use std::sync::Mutex as StdMutex;
use std::{u32, u64};
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task};
use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState};
use super::dataqueue::{DataQueue, DataQueueItem};
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
@ -185,7 +185,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
&self,
pad: &PadSinkRef,
queue: &Queue,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -193,7 +193,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
gst_debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
if let EventView::FlushStart(..) = event.view() {
queue.stop(&element).unwrap();
queue.task.flush_start();
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
@ -218,7 +218,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
let queue = Queue::from_instance(&element);
if let EventView::FlushStop(..) = event.view() {
queue.flush_stop(&element);
queue.task.flush_stop();
}
gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
@ -256,11 +256,9 @@ struct QueuePadSrcHandler;
impl QueuePadSrcHandler {
async fn push_item(
pad: &PadSrcRef<'_>,
element: &gst::Element,
queue: &Queue,
item: DataQueueItem,
) -> Result<(), gst::FlowError> {
let queue = Queue::from_instance(element);
if let Some(pending_queue) = queue.pending_queue.lock().unwrap().as_mut() {
pending_queue.notify_more_queue_space();
}
@ -290,7 +288,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
&self,
pad: &PadSrcRef,
queue: &Queue,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -298,8 +296,8 @@ impl PadSrcHandler for QueuePadSrcHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
match event.view() {
EventView::FlushStart(..) => queue.stop(element).unwrap(),
EventView::FlushStop(..) => queue.flush_stop(element),
EventView::FlushStart(..) => queue.task.flush_start(),
EventView::FlushStop(..) => queue.task.flush_stop(),
_ => (),
}
@ -346,6 +344,129 @@ impl PadSrcHandler for QueuePadSrcHandler {
}
}
#[derive(Debug)]
struct QueueTask {
element: gst::Element,
src_pad: PadSrcWeak,
dataqueue: DataQueue,
}
impl QueueTask {
fn new(element: &gst::Element, src_pad: &PadSrc, dataqueue: DataQueue) -> Self {
QueueTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
dataqueue,
}
}
}
impl TaskImpl for QueueTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
let queue = Queue::from_instance(&self.element);
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.start();
*last_res = Ok(gst::FlowSuccess::Ok);
gst_log!(CAT, obj: &self.element, "Task started");
}
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.dataqueue.next().await;
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: &self.element, "DataQueue Stopped");
return Err(gst::FlowError::Flushing);
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let queue = Queue::from_instance(&self.element);
let res = QueuePadSrcHandler::push_item(&pad, &queue, item).await;
match res {
Ok(()) => {
gst_log!(CAT, obj: &self.element, "Successfully pushed item");
*queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &self.element, "Flushing");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &self.element, "EOS");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
}
Err(err) => {
gst_error!(CAT, obj: &self.element, "Got error {}", err);
gst_element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
*queue.last_res.lock().unwrap() = Err(err);
}
}
res
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
let queue = Queue::from_instance(&self.element);
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.stop();
self.dataqueue.clear();
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
*last_res = Err(gst::FlowError::Flushing);
gst_log!(CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task flush");
let queue = Queue::from_instance(&self.element);
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.clear();
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
*last_res = Err(gst::FlowError::Flushing);
gst_log!(CAT, obj: &self.element, "Task flush started");
}
.boxed()
}
}
#[derive(Debug)]
struct Queue {
sink_pad: PadSink,
@ -405,57 +526,44 @@ impl Queue {
/* Schedules emptying of the pending queue. If there is an upstream
* TaskContext, the new task is spawned, it is otherwise
* returned, for the caller to block on */
fn schedule_pending_queue(
&self,
element: &gst::Element,
pending_queue: &mut Option<PendingQueue>,
) -> impl Future<Output = ()> {
gst_log!(CAT, obj: element, "Scheduling pending queue now");
async fn schedule_pending_queue(&self, element: &gst::Element) {
loop {
let more_queue_space_receiver = {
let dataqueue = self.dataqueue.lock().unwrap();
if dataqueue.is_none() {
return;
}
let mut pending_queue_grd = self.pending_queue.lock().unwrap();
pending_queue.as_mut().unwrap().scheduled = true;
gst_log!(CAT, obj: element, "Trying to empty pending queue");
let element = element.clone();
async move {
let queue = Self::from_instance(&element);
loop {
let more_queue_space_receiver = {
let dataqueue = queue.dataqueue.lock().unwrap();
if dataqueue.is_none() {
return;
if let Some(pending_queue) = pending_queue_grd.as_mut() {
let mut failed_item = None;
while let Some(item) = pending_queue.items.pop_front() {
if let Err(item) = dataqueue.as_ref().unwrap().push(item) {
failed_item = Some(item);
}
}
let mut pending_queue_grd = queue.pending_queue.lock().unwrap();
gst_log!(CAT, obj: &element, "Trying to empty pending queue");
if let Some(failed_item) = failed_item {
pending_queue.items.push_front(failed_item);
let (sender, receiver) = oneshot::channel();
pending_queue.more_queue_space_sender = Some(sender);
if let Some(pending_queue) = pending_queue_grd.as_mut() {
let mut failed_item = None;
while let Some(item) = pending_queue.items.pop_front() {
if let Err(item) = dataqueue.as_ref().unwrap().push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
pending_queue.items.push_front(failed_item);
let (sender, receiver) = oneshot::channel();
pending_queue.more_queue_space_sender = Some(sender);
receiver
} else {
gst_log!(CAT, obj: &element, "Pending queue is empty now");
*pending_queue_grd = None;
return;
}
receiver
} else {
gst_log!(CAT, obj: &element, "Flushing, dropping pending queue");
gst_log!(CAT, obj: element, "Pending queue is empty now");
*pending_queue_grd = None;
return;
}
};
} else {
gst_log!(CAT, obj: element, "Flushing, dropping pending queue");
return;
}
};
gst_log!(CAT, obj: &element, "Waiting for more queue space");
let _ = more_queue_space_receiver.await;
}
gst_log!(CAT, obj: element, "Waiting for more queue space");
let _ = more_queue_space_receiver.await;
}
}
@ -503,7 +611,10 @@ impl Queue {
);
if schedule_now {
let wait_fut = self.schedule_pending_queue(element, &mut pending_queue);
gst_log!(CAT, obj: element, "Scheduling pending queue now");
pending_queue.as_mut().unwrap().scheduled = true;
let wait_fut = self.schedule_pending_queue(element);
Some(wait_fut)
} else {
gst_log!(CAT, obj: element, "Scheduling pending queue later");
@ -551,7 +662,7 @@ impl Queue {
},
);
*self.dataqueue.lock().unwrap() = Some(dataqueue);
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
@ -561,19 +672,21 @@ impl Queue {
)
})?;
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.task
.prepare(QueueTask::new(element, &self.src_pad, dataqueue), context)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
@ -584,129 +697,18 @@ impl Queue {
*self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
*self.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
self.task.stop();
if let Some(dataqueue) = dataqueue.as_ref() {
dataqueue.pause();
dataqueue.clear();
dataqueue.stop();
}
if let Some(mut pending_queue) = self.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
if dataqueue.state() == DataQueueState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Ok(());
}
fn start(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Starting");
dataqueue.start();
*self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
self.start_task(element, dataqueue);
self.task.start();
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self, element: &gst::Element, dataqueue: &DataQueue) {
let pad_weak = self.src_pad.downgrade();
let dataqueue = dataqueue.clone();
let element = element.clone();
self.task.start(move || {
let pad_weak = pad_weak.clone();
let mut dataqueue = dataqueue.clone();
let element = element.clone();
async move {
let item = dataqueue.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped");
return glib::Continue(false);
}
};
let queue = Queue::from_instance(&element);
match QueuePadSrcHandler::push_item(&pad, &element, item).await {
Ok(()) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
*queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
*queue.last_res.lock().unwrap() = Err(err);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `dataqueue` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
// Note that this won't deadlock as `Queue::dataqueue` guards a pointer to
// the `dataqueue` used within the `src_pad`'s `Task`.
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
if dataqueue.state() == DataQueueState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
}
gst_debug!(CAT, obj: element, "Stopping Flush");
dataqueue.start();
self.start_task(element, dataqueue);
gst_debug!(CAT, obj: element, "Stopped Flush");
}
}
@ -837,10 +839,10 @@ impl ElementImpl for Queue {
})?;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -848,7 +850,7 @@ impl ElementImpl for Queue {
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
Ok(success)

View file

@ -488,6 +488,23 @@ impl Context {
}
pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.spawn_internal(future, false)
}
pub fn awake_and_spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.spawn_internal(future, true)
}
#[inline]
fn spawn_internal<Fut>(&self, future: Fut, must_awake: bool) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
@ -510,7 +527,7 @@ impl Context {
real.name
);
let join_handle = real.handle.lock().unwrap().spawn(async move {
let spawn_fut = async move {
let ctx = Context::current().unwrap();
let real = ctx.0.real.as_ref().unwrap();
@ -542,7 +559,15 @@ impl Context {
gst_trace!(RUNTIME_CAT, "Task {:?} on context {} done", id, real.name);
res
});
};
let join_handle = {
if must_awake {
real.handle.lock().unwrap().awake_and_spawn(spawn_fut)
} else {
real.handle.lock().unwrap().spawn(spawn_fut)
}
};
JoinHandle {
join_handle,

View file

@ -54,6 +54,7 @@ pub use task::{Task, TaskState};
pub mod prelude {
pub use super::pad::{PadSinkHandler, PadSrcHandler};
pub use super::task::TaskImpl;
}
pub mod time;

File diff suppressed because it is too large Load diff

View file

@ -16,21 +16,22 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use futures::future::{abortable, AbortHandle, Aborted, BoxFuture};
use futures::prelude::*;
use futures::future::BoxFuture;
use gst::prelude::*;
use gst::{gst_debug, gst_error, gst_error_msg};
use gst::{gst_debug, gst_error, gst_error_msg, gst_log};
use lazy_static::lazy_static;
use std::io;
use std::sync::{Arc, Mutex};
use gio::prelude::*;
use gio_sys as gio_ffi;
use gobject_sys as gobject_ffi;
use std::error;
use std::fmt;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@ -45,244 +46,106 @@ lazy_static! {
);
}
pub struct Socket<T: SocketRead + 'static>(Arc<Mutex<SocketInner<T>>>);
pub trait SocketRead: Send + Unpin {
const DO_TIMESTAMP: bool;
fn read<'buf>(
&self,
&'buf mut self,
buffer: &'buf mut [u8],
) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>>;
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum SocketState {
Paused,
Prepared,
Started,
Unprepared,
}
struct SocketInner<T: SocketRead + 'static> {
state: SocketState,
pub struct Socket<T: SocketRead> {
element: gst::Element,
buffer_pool: gst::BufferPool,
reader: T,
mapped_buffer: Option<gst::MappedBuffer<gst::buffer::Writable>>,
clock: Option<gst::Clock>,
base_time: Option<gst::ClockTime>,
create_read_handle: Option<AbortHandle>,
create_reader_fut: Option<BoxFuture<'static, Result<T, SocketError>>>,
read_handle: Option<AbortHandle>,
reader: Option<T>,
}
impl<T: SocketRead + 'static> Socket<T> {
pub fn new<F>(
element: &gst::Element,
impl<T: SocketRead> Socket<T> {
pub fn try_new(
element: gst::Element,
buffer_pool: gst::BufferPool,
create_reader_fut: F,
) -> Result<Self, ()>
where
F: Future<Output = Result<T, SocketError>> + Send + 'static,
{
let socket = Socket(Arc::new(Mutex::new(SocketInner::<T> {
state: SocketState::Unprepared,
element: element.clone(),
reader: T,
) -> Result<Self, ()> {
// FIXME couldn't we just delegate this to caller?
buffer_pool.set_active(true).map_err(|err| {
gst_error!(
SOCKET_CAT,
obj: &element,
"Failed to prepare socket: {}",
err
);
})?;
Ok(Socket::<T> {
buffer_pool,
element,
reader,
mapped_buffer: None,
clock: None,
base_time: None,
create_read_handle: None,
create_reader_fut: Some(create_reader_fut.boxed()),
read_handle: None,
reader: None,
})));
let mut inner = socket.0.lock().unwrap();
if inner.state != SocketState::Unprepared {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already prepared");
return Err(());
}
gst_debug!(SOCKET_CAT, obj: &inner.element, "Preparing socket");
inner.buffer_pool.set_active(true).map_err(|err| {
gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to prepare socket: {}", err);
})?;
inner.state = SocketState::Prepared;
drop(inner);
Ok(socket)
})
}
pub fn state(&self) -> SocketState {
self.0.lock().unwrap().state
}
pub fn start(
&self,
clock: Option<gst::Clock>,
base_time: Option<gst::ClockTime>,
) -> Result<SocketStream<T>, ()> {
// Paused->Playing
let mut inner = self.0.lock().unwrap();
assert_ne!(SocketState::Unprepared, inner.state);
if inner.state == SocketState::Started {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already started");
return Err(());
}
gst_debug!(SOCKET_CAT, obj: &inner.element, "Starting socket");
inner.clock = clock;
inner.base_time = base_time;
inner.state = SocketState::Started;
Ok(SocketStream::<T>::new(self))
}
pub fn pause(&self) {
// Playing->Paused
let mut inner = self.0.lock().unwrap();
assert_ne!(SocketState::Unprepared, inner.state);
if inner.state != SocketState::Started {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket not started");
return;
}
gst_debug!(SOCKET_CAT, obj: &inner.element, "Pausing socket");
inner.clock = None;
inner.base_time = None;
inner.state = SocketState::Paused;
if let Some(read_handle) = inner.read_handle.take() {
read_handle.abort();
}
pub fn set_clock(&mut self, clock: Option<gst::Clock>, base_time: Option<gst::ClockTime>) {
self.clock = clock;
self.base_time = base_time;
}
}
impl<T: SocketRead> Drop for SocketInner<T> {
fn drop(&mut self) {
// Ready->Null
assert_ne!(SocketState::Started, self.state);
if self.state == SocketState::Unprepared {
gst_debug!(SOCKET_CAT, obj: &self.element, "Socket already unprepared");
return;
}
if let Some(create_read_handle_handle) = self.create_read_handle.take() {
create_read_handle_handle.abort();
}
if let Err(err) = self.buffer_pool.set_active(false) {
gst_error!(SOCKET_CAT, obj: &self.element, "Failed to unprepare socket: {}", err);
}
self.state = SocketState::Unprepared;
}
}
impl<T: SocketRead + Unpin + 'static> Clone for Socket<T> {
fn clone(&self) -> Self {
Socket::<T>(self.0.clone())
}
}
pub type SocketStreamItem = Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError>;
#[derive(Debug)]
pub enum SocketError {
Gst(gst::FlowError),
Io(io::Error),
}
pub struct SocketStream<T: SocketRead + 'static> {
socket: Socket<T>,
mapped_buffer: Option<gst::MappedBuffer<gst::buffer::Writable>>,
}
impl error::Error for SocketError {}
impl<T: SocketRead + 'static> SocketStream<T> {
fn new(socket: &Socket<T>) -> Self {
SocketStream {
socket: socket.clone(),
mapped_buffer: None,
impl fmt::Display for SocketError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SocketError::Gst(err) => write!(f, "flow error: {}", err),
SocketError::Io(err) => write!(f, "IO error: {}", err),
}
}
}
pub type SocketStreamItem = Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError>;
impl<T: SocketRead> Socket<T> {
// Can't implement this as a Stream trait because we end up using things like
// tokio::net::UdpSocket which don't implement pollable functions.
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<SocketStreamItem> {
// First create if needed
let (create_reader_fut, element) = {
let mut inner = self.socket.0.lock().unwrap();
gst_log!(SOCKET_CAT, obj: &self.element, "Trying to read data");
if let Some(create_reader_fut) = inner.create_reader_fut.take() {
let (create_reader_fut, abort_handle) = abortable(create_reader_fut);
inner.create_read_handle = Some(abort_handle);
(Some(create_reader_fut), inner.element.clone())
} else {
(None, inner.element.clone())
}
};
if let Some(create_reader_fut) = create_reader_fut {
match create_reader_fut.await {
Ok(Ok(read)) => {
let mut inner = self.socket.0.lock().unwrap();
inner.create_read_handle = None;
inner.reader = Some(read);
if self.mapped_buffer.is_none() {
match self.buffer_pool.acquire_buffer(None) {
Ok(buffer) => {
self.mapped_buffer = Some(buffer.into_mapped_buffer_writable().unwrap());
}
Ok(Err(err)) => {
gst_debug!(SOCKET_CAT, obj: &element, "Create reader error {:?}", err);
return Some(Err(err));
}
Err(Aborted) => {
gst_debug!(SOCKET_CAT, obj: &element, "Create reader Aborted");
return None;
Err(err) => {
gst_debug!(SOCKET_CAT, obj: &self.element, "Failed to acquire buffer {:?}", err);
return Some(Err(SocketError::Gst(err)));
}
}
}
// take the mapped_buffer before locking the socket so as to please the mighty borrow checker
let (read_fut, clock, base_time) = {
let mut inner = self.socket.0.lock().unwrap();
if inner.state != SocketState::Started {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket is not Started");
return None;
}
let reader = match inner.reader {
None => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Have no reader");
return None;
}
Some(ref reader) => reader,
};
gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data");
if self.mapped_buffer.is_none() {
match inner.buffer_pool.acquire_buffer(None) {
Ok(buffer) => {
self.mapped_buffer = Some(buffer.into_mapped_buffer_writable().unwrap());
}
Err(err) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err);
return Some(Err(SocketError::Gst(err)));
}
}
}
let (read_fut, abort_handle) =
abortable(reader.read(self.mapped_buffer.as_mut().unwrap().as_mut_slice()));
inner.read_handle = Some(abort_handle);
(read_fut, inner.clock.clone(), inner.base_time)
};
match read_fut.await {
Ok(Ok((len, saddr))) => {
match self
.reader
.read(self.mapped_buffer.as_mut().unwrap().as_mut_slice())
.await
{
Ok((len, saddr)) => {
let dts = if T::DO_TIMESTAMP {
let time = clock.as_ref().unwrap().get_time();
let running_time = time - base_time.unwrap();
let time = self.clock.as_ref().unwrap().get_time();
let running_time = time - self.base_time.unwrap();
gst_debug!(
SOCKET_CAT,
obj: &element,
obj: &self.element,
"Read {} bytes at {} (clock {})",
len,
running_time,
@ -290,7 +153,7 @@ impl<T: SocketRead + 'static> SocketStream<T> {
);
running_time
} else {
gst_debug!(SOCKET_CAT, obj: &element, "Read {} bytes", len);
gst_debug!(SOCKET_CAT, obj: &self.element, "Read {} bytes", len);
gst::CLOCK_TIME_NONE
};
@ -305,16 +168,19 @@ impl<T: SocketRead + 'static> SocketStream<T> {
Some(Ok((buffer, saddr)))
}
Ok(Err(err)) => {
gst_debug!(SOCKET_CAT, obj: &element, "Read error {:?}", err);
Err(err) => {
gst_debug!(SOCKET_CAT, obj: &self.element, "Read error {:?}", err);
Some(Err(SocketError::Io(err)))
}
Err(Aborted) => {
gst_debug!(SOCKET_CAT, obj: &element, "Read Aborted");
}
}
}
None
}
impl<T: SocketRead> Drop for Socket<T> {
fn drop(&mut self) {
if let Err(err) = self.buffer_pool.set_active(false) {
gst_error!(SOCKET_CAT, obj: &self.element, "Failed to unprepare socket: {}", err);
}
}
}

View file

@ -41,9 +41,9 @@ use std::u32;
use tokio::io::AsyncReadExt;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, Task};
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task};
use super::socket::{Socket, SocketError, SocketRead, SocketState};
use super::socket::{Socket, SocketError, SocketRead};
const DEFAULT_HOST: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: i32 = 4953;
@ -138,15 +138,11 @@ static PROPERTIES: [subclass::Property; 6] = [
}),
];
struct TcpClientReaderInner {
socket: tokio::net::TcpStream,
}
pub struct TcpClientReader(Arc<FutMutex<TcpClientReaderInner>>);
pub struct TcpClientReader(tokio::net::TcpStream);
impl TcpClientReader {
pub fn new(socket: tokio::net::TcpStream) -> Self {
TcpClientReader(Arc::new(FutMutex::new(TcpClientReaderInner { socket })))
TcpClientReader(socket)
}
}
@ -154,20 +150,10 @@ impl SocketRead for TcpClientReader {
const DO_TIMESTAMP: bool = false;
fn read<'buf>(
&self,
&'buf mut self,
buffer: &'buf mut [u8],
) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>> {
let this = Arc::clone(&self.0);
async move {
this.lock()
.await
.socket
.read(buffer)
.await
.map(|read_size| (read_size, None))
}
.boxed()
async move { self.0.read(buffer).await.map(|read_size| (read_size, None)) }.boxed()
}
}
@ -206,16 +192,12 @@ impl TcpClientSrcPadHandler {
.caps = caps;
}
fn reset_state(&self) {
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
async fn reset_state(&self) {
*self.0.configured_caps.lock().unwrap() = None;
}
fn set_need_segment(&self) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.need_segment = true;
async fn set_need_segment(&self) {
self.0.state.lock().await.need_segment = true;
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
@ -274,7 +256,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
&self,
pad: &PadSrcRef,
tcpclientsrc: &TcpClientSrc,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -283,12 +265,12 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
tcpclientsrc.flush_start(element);
tcpclientsrc.task.flush_start();
true
}
EventView::FlushStop(..) => {
tcpclientsrc.flush_stop(element);
tcpclientsrc.task.flush_stop();
true
}
@ -354,11 +336,167 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
}
}
struct TcpClientSrcTask {
element: gst::Element,
src_pad: PadSrcWeak,
src_pad_handler: TcpClientSrcPadHandler,
saddr: SocketAddr,
buffer_pool: Option<gst::BufferPool>,
socket: Option<Socket<TcpClientReader>>,
}
impl TcpClientSrcTask {
fn new(
element: &gst::Element,
src_pad: &PadSrc,
src_pad_handler: &TcpClientSrcPadHandler,
saddr: SocketAddr,
buffer_pool: gst::BufferPool,
) -> Self {
TcpClientSrcTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
src_pad_handler: src_pad_handler.clone(),
saddr,
buffer_pool: Some(buffer_pool),
socket: None,
}
}
}
impl TaskImpl for TcpClientSrcTask {
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr);
let socket = tokio::net::TcpStream::connect(self.saddr)
.await
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to connect to {:?}: {:?}", self.saddr, err]
)
})?;
self.socket = Some(
Socket::try_new(
self.element.clone(),
self.buffer_pool.take().unwrap(),
TcpClientReader::new(socket),
)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?,
);
gst_log!(CAT, obj: &self.element, "Task prepared");
Ok(())
}
.boxed()
}
fn handle_prepare_error(&mut self, err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
async move {
gst_error!(CAT, "Task preparation failed: {:?}", err);
self.element.post_error_message(&err);
}
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.socket.as_mut().unwrap().next().await;
let buffer = match item {
Some(Ok((buffer, _))) => buffer,
Some(Err(err)) => {
gst_error!(CAT, obj: &self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return Err(gst::FlowError::Error);
}
None => {
gst_log!(CAT, obj: &self.element, "SocketStream Stopped");
return Err(gst::FlowError::Flushing);
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let res = self
.src_pad_handler
.push_buffer(&pad, &self.element, buffer)
.await;
match res {
Ok(_) => {
gst_log!(CAT, obj: &self.element, "Successfully pushed buffer");
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &self.element, "Flushing");
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &self.element, "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
}
Err(err) => {
gst_error!(CAT, obj: &self.element, "Got error {}", err);
gst_element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
res.map(drop)
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Task flush stopped");
}
.boxed()
}
}
struct TcpClientSrc {
src_pad: PadSrc,
src_pad_handler: TcpClientSrcPadHandler,
task: Task,
socket: StdMutex<Option<Socket<TcpClientReader>>>,
settings: StdMutex<Settings>,
}
@ -414,194 +552,54 @@ impl TcpClientSrc {
})?;
let saddr = SocketAddr::new(host, port as u16);
let element_clone = element.clone();
let socket = Socket::new(element.upcast_ref(), buffer_pool, async move {
gst_debug!(CAT, obj: &element_clone, "Connecting to {:?}", saddr);
let socket = tokio::net::TcpStream::connect(saddr)
.await
.map_err(SocketError::Io)?;
Ok(TcpClientReader::new(socket))
})
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?;
*self.socket.lock().unwrap() = Some(socket);
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad_handler.prepare(settings.caps);
self.task
.prepare(
TcpClientSrcTask::new(
element,
&self.src_pad,
&self.src_pad_handler,
saddr,
buffer_pool,
),
context,
)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
*self.socket.lock().unwrap() = None;
self.task.unprepare().unwrap();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.src_pad_handler.reset_state();
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
let socket = socket.as_ref().unwrap();
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Ok(());
}
fn start(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Starting");
self.start_task(element, socket);
self.task.start();
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self, element: &gst::Element, socket: &Socket<TcpClientReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
let src_pad_handler = self.src_pad_handler.clone();
let pad_weak = self.src_pad.downgrade();
let element = element.clone();
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let buffer = match item {
Some(Ok((buffer, _))) => buffer,
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
return glib::Continue(false);
}
};
match src_pad_handler.push_buffer(&pad, &element, buffer).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
let socket = self.socket.lock().unwrap();
if let Some(socket) = socket.as_ref() {
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
}
gst_debug!(CAT, obj: element, "Stopping Flush");
self.src_pad_handler.set_need_segment();
self.start_task(element, socket);
gst_debug!(CAT, obj: element, "Stopped Flush");
} else {
gst_debug!(CAT, obj: element, "Socket not available");
}
}
fn flush_start(&self, element: &gst::Element) {
let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Starting Flush");
if let Some(socket) = socket.as_ref() {
socket.pause();
}
self.task.cancel();
gst_debug!(CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
fn pause(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Pausing");
self.socket.lock().unwrap().as_ref().unwrap().pause();
self.task.pause();
gst_debug!(CAT, obj: element, "Paused");
Ok(())
}
}
@ -644,7 +642,6 @@ impl ObjectSubclass for TcpClientSrc {
),
src_pad_handler,
task: Task::default(),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}
@ -724,10 +721,10 @@ impl ElementImpl for TcpClientSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element).map_err(|_| gst::StateChangeError)?;
self.pause(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -739,13 +736,13 @@ impl ElementImpl for TcpClientSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
_ => (),
}

View file

@ -827,7 +827,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
async move {
if let EventView::FlushStop(_) = event.view() {
let udpsink = UdpSink::from_instance(&element);
let _ = udpsink.start(&element);
udpsink.task.flush_stop();
} else if let Some(sender) = sender.lock().await.as_mut() {
if sender.send(TaskItem::Event(event)).await.is_err() {
gst_debug!(CAT, obj: &element, "Flushing");
@ -843,17 +843,81 @@ impl PadSinkHandler for UdpSinkPadHandler {
&self,
_pad: &PadSinkRef,
udpsink: &UdpSink,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
if let EventView::FlushStart(..) = event.view() {
let _ = udpsink.stop(&element);
udpsink.task.flush_start();
}
true
}
}
#[derive(Debug)]
struct UdpSinkTask {
element: gst::Element,
sink_pad_handler: UdpSinkPadHandler,
receiver: Option<mpsc::Receiver<TaskItem>>,
}
impl UdpSinkTask {
fn new(element: &gst::Element, sink_pad_handler: &UdpSinkPadHandler) -> Self {
UdpSinkTask {
element: element.clone(),
sink_pad_handler: sink_pad_handler.clone(),
receiver: None,
}
}
}
impl TaskImpl for UdpSinkTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
let (sender, receiver) = mpsc::channel(0);
let mut sink_pad_handler = self.sink_pad_handler.0.write().unwrap();
sink_pad_handler.sender = Arc::new(Mutex::new(Some(sender)));
self.receiver = Some(receiver);
gst_log!(CAT, obj: &self.element, "Task started");
}
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
match self.receiver.as_mut().unwrap().next().await {
Some(TaskItem::Buffer(buffer)) => {
match self.sink_pad_handler.render(&self.element, buffer).await {
Err(err) => {
gst_element_error!(
&self.element,
gst::StreamError::Failed,
["Failed to render item, stopping task: {}", err]
);
Err(gst::FlowError::Error)
}
_ => Ok(()),
}
}
Some(TaskItem::Event(event)) => {
self.sink_pad_handler
.handle_event(&self.element, event)
.await;
Ok(())
}
None => Err(gst::FlowError::Flushing),
}
}
.boxed()
}
}
#[derive(Debug)]
enum SocketFamily {
Ipv4,
@ -1022,79 +1086,39 @@ impl UdpSink {
self.prepare_socket(SocketFamily::Ipv4, &context, element)?;
self.prepare_socket(SocketFamily::Ipv6, &context, element)?;
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.task
.prepare(UdpSinkTask::new(&element, &self.sink_pad_handler), context)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(CAT, obj: element, "Started preparing");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
self.sink_pad_handler.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
fn start(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Starting");
let sink_pad_handler = self.sink_pad_handler.clone();
let element_clone = element.clone();
let (sender, receiver) = mpsc::channel(0);
let receiver = Arc::new(Mutex::new(receiver));
sink_pad_handler.0.write().unwrap().sender = Arc::new(Mutex::new(Some(sender)));
self.task.start(move || {
let receiver = Arc::clone(&receiver);
let element = element_clone.clone();
let sink_pad_handler = sink_pad_handler.clone();
async move {
match receiver.lock().await.next().await {
Some(TaskItem::Buffer(buffer)) => {
match sink_pad_handler.render(&element, buffer).await {
Err(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to render item, stopping task: {}", err]
);
glib::Continue(false)
}
_ => glib::Continue(true),
}
}
Some(TaskItem::Event(event)) => {
sink_pad_handler.handle_event(&element, event).await;
glib::Continue(true)
}
None => glib::Continue(false),
}
}
});
Ok(())
self.task.start();
gst_debug!(CAT, obj: element, "Started");
}
}
@ -1491,13 +1515,13 @@ impl ElementImpl for UdpSink {
})?;
}
gst::StateChange::ReadyToPaused => {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}

View file

@ -39,9 +39,9 @@ use std::sync::Mutex as StdMutex;
use std::u16;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, Task};
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task};
use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState};
use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
const DEFAULT_ADDRESS: Option<&str> = Some("0.0.0.0");
const DEFAULT_PORT: i32 = 5000;
@ -185,16 +185,11 @@ static PROPERTIES: [subclass::Property; 10] = [
];
#[derive(Debug)]
struct UdpReaderInner {
socket: tokio::net::UdpSocket,
}
#[derive(Debug)]
pub struct UdpReader(Arc<FutMutex<UdpReaderInner>>);
pub struct UdpReader(tokio::net::UdpSocket);
impl UdpReader {
fn new(socket: tokio::net::UdpSocket) -> Self {
UdpReader(Arc::new(FutMutex::new(UdpReaderInner { socket })))
UdpReader(socket)
}
}
@ -202,15 +197,11 @@ impl SocketRead for UdpReader {
const DO_TIMESTAMP: bool = true;
fn read<'buf>(
&self,
&'buf mut self,
buffer: &'buf mut [u8],
) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>> {
let this = Arc::clone(&self.0);
async move {
this.lock()
.await
.socket
self.0
.recv_from(buffer)
.await
.map(|(read_size, saddr)| (read_size, Some(saddr)))
@ -255,16 +246,12 @@ impl UdpSrcPadHandler {
state.retrieve_sender_address = retrieve_sender_address;
}
fn reset_state(&self) {
*self.0.state.try_lock().expect("State locked elsewhere") = Default::default();
async fn reset_state(&self) {
*self.0.state.lock().await = Default::default();
}
fn set_need_segment(&self) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.need_segment = true;
async fn set_need_segment(&self) {
self.0.state.lock().await.need_segment = true;
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
@ -317,7 +304,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
&self,
pad: &PadSrcRef,
udpsrc: &UdpSrc,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
@ -326,12 +313,12 @@ impl PadSrcHandler for UdpSrcPadHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
udpsrc.flush_start(element);
udpsrc.task.flush_start();
true
}
EventView::FlushStop(..) => {
udpsrc.flush_stop(element);
udpsrc.task.flush_stop();
true
}
@ -398,11 +385,148 @@ impl PadSrcHandler for UdpSrcPadHandler {
}
}
struct UdpSrcTask {
element: gst::Element,
src_pad: PadSrcWeak,
src_pad_handler: UdpSrcPadHandler,
socket: Socket<UdpReader>,
}
impl UdpSrcTask {
fn new(
element: &gst::Element,
src_pad: &PadSrc,
src_pad_handler: &UdpSrcPadHandler,
socket: Socket<UdpReader>,
) -> Self {
UdpSrcTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
src_pad_handler: src_pad_handler.clone(),
socket,
}
}
}
impl TaskImpl for UdpSrcTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
self.socket
.set_clock(self.element.get_clock(), Some(self.element.get_base_time()));
gst_log!(CAT, obj: &self.element, "Task started");
}
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.socket.next().await;
let (mut buffer, saddr) = match item {
Some(Ok((buffer, saddr))) => (buffer, saddr),
Some(Err(err)) => {
gst_error!(CAT, obj: &self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return Err(gst::FlowError::Error);
}
None => {
gst_log!(CAT, obj: &self.element, "SocketStream Stopped");
return Err(gst::FlowError::Flushing);
}
};
if let Some(saddr) = saddr {
if self
.src_pad_handler
.0
.state
.lock()
.await
.retrieve_sender_address
{
let inet_addr = match saddr.ip() {
IpAddr::V4(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V4(&ip.octets()),
),
IpAddr::V6(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V6(&ip.octets()),
),
};
let inet_socket_addr = &gio::InetSocketAddress::new(&inet_addr, saddr.port());
NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr);
}
}
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let res = self
.src_pad_handler
.push_buffer(&pad, &self.element, buffer)
.await;
match res {
Ok(_) => gst_log!(CAT, obj: &self.element, "Successfully pushed buffer"),
Err(gst::FlowError::Flushing) => gst_debug!(CAT, obj: &self.element, "Flushing"),
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &self.element, "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
}
Err(err) => {
gst_error!(CAT, obj: &self.element, "Got error {}", err);
gst_element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
res.map(drop)
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Stopped task flush");
}
.boxed()
}
}
struct UdpSrc {
src_pad: PadSrc,
src_pad_handler: UdpSrcPadHandler,
task: Task,
socket: StdMutex<Option<Socket<UdpReader>>>,
settings: StdMutex<Settings>,
}
@ -585,210 +709,63 @@ impl UdpSrc {
)
})?;
let socket = Socket::new(element.upcast_ref(), buffer_pool, async move {
Ok(UdpReader::new(socket))
})
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?;
let socket = Socket::try_new(element.clone(), buffer_pool, UdpReader::new(socket))
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?;
*self.socket.lock().unwrap() = Some(socket);
element.notify("used-socket");
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad_handler
.prepare(settings.caps, settings.retrieve_sender_address);
self.task
.prepare(
UdpSrcTask::new(element, &self.src_pad, &self.src_pad_handler, socket),
context,
)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
*self.socket.lock().unwrap() = None;
self.settings.lock().unwrap().used_socket = None;
element.notify("used-socket");
self.task.unprepare().unwrap();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
fn stop(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.src_pad_handler.reset_state();
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
let socket = socket.as_ref().unwrap();
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Ok(());
}
fn start(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Starting");
self.start_task(element, socket);
self.task.start();
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self, element: &gst::Element, socket: &Socket<UdpReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
let src_pad_handler = self.src_pad_handler.clone();
let pad_weak = self.src_pad.downgrade();
let element = element.clone();
self.task.start(move || {
let src_pad_handler = src_pad_handler.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let (mut buffer, saddr) = match item {
Some(Ok((buffer, saddr))) => (buffer, saddr),
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
return glib::Continue(false);
}
};
if let Some(saddr) = saddr {
if src_pad_handler.0.state.lock().await.retrieve_sender_address {
let inet_addr = match saddr.ip() {
IpAddr::V4(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V4(&ip.octets()),
),
IpAddr::V6(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V6(&ip.octets()),
),
};
let inet_socket_addr =
&gio::InetSocketAddress::new(&inet_addr, saddr.port());
NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr);
}
}
match src_pad_handler.push_buffer(&pad, &element, buffer).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
let socket = self.socket.lock().unwrap();
if let Some(socket) = socket.as_ref() {
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
}
gst_debug!(CAT, obj: element, "Stopping Flush");
self.src_pad_handler.set_need_segment();
self.start_task(element, socket);
gst_debug!(CAT, obj: element, "Stopped Flush");
} else {
gst_debug!(CAT, obj: element, "Socket not available");
}
}
fn flush_start(&self, element: &gst::Element) {
let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Starting Flush");
if let Some(socket) = socket.as_ref() {
socket.pause();
}
self.task.cancel();
gst_debug!(CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
fn pause(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Pausing");
self.socket.lock().unwrap().as_ref().unwrap().pause();
self.task.pause();
gst_debug!(CAT, obj: element, "Paused");
Ok(())
}
}
@ -847,7 +824,6 @@ impl ObjectSubclass for UdpSrc {
),
src_pad_handler,
task: Task::default(),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}
@ -956,10 +932,10 @@ impl ElementImpl for UdpSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element).map_err(|_| gst::StateChangeError)?;
self.pause(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -971,13 +947,13 @@ impl ElementImpl for UdpSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
_ => (),
}

View file

@ -95,7 +95,7 @@ fn push() {
}
#[test]
fn pause() {
fn pause_regular() {
init();
let mut h = gst_check::Harness::new("ts-appsrc");
@ -122,10 +122,6 @@ fn pause() {
let _ = h.pull().unwrap();
appsrc
.change_state(gst::StateChange::PlayingToPaused)
.unwrap();
// Pre-pause buffer
assert!(appsrc
.emit("push-buffer", &[&gst::Buffer::from_slice(vec![5, 6, 7])])
@ -169,7 +165,7 @@ fn pause() {
}
#[test]
fn flush() {
fn flush_regular() {
init();
let mut h = gst_check::Harness::new("ts-appsrc");
@ -264,7 +260,7 @@ fn pause_flush() {
// FlushStart
assert!(h.push_upstream_event(gst::Event::new_flush_start().build()));
// Can't push buffer while flushing
// Can't push buffers while flushing
assert!(!appsrc
.emit("push-buffer", &[&gst::Buffer::new()])
.unwrap()

View file

@ -33,11 +33,12 @@ use lazy_static::lazy_static;
use std::boxed::Box;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task};
use gstthreadshare::runtime::{
Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState,
};
const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: u32 = 2;
@ -82,7 +83,10 @@ lazy_static! {
struct PadSrcTestHandler;
impl PadSrcTestHandler {
async fn push_item(pad: PadSrcRef<'_>, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> {
async fn push_item(
pad: &PadSrcRef<'_>,
item: Item,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
match item {
@ -104,19 +108,19 @@ impl PadSrcHandler for PadSrcTestHandler {
&self,
pad: &PadSrcRef,
elem_src_test: &ElementSrcTest,
element: &gst::Element,
_element: &gst::Element,
event: gst::Event,
) -> bool {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
elem_src_test.flush_start(element);
elem_src_test.task.flush_start();
true
}
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
EventView::FlushStop(..) => {
elem_src_test.flush_stop(&element);
elem_src_test.task.flush_stop();
true
}
_ => false,
@ -132,27 +136,89 @@ impl PadSrcHandler for PadSrcTestHandler {
}
}
#[derive(Debug, Eq, PartialEq)]
enum ElementSrcTestState {
Paused,
RejectItems,
Started,
#[derive(Debug)]
struct ElementSrcTestTask {
element: gst::Element,
src_pad: PadSrcWeak,
receiver: mpsc::Receiver<Item>,
}
impl ElementSrcTestTask {
fn new(element: &gst::Element, src_pad: &PadSrc, receiver: mpsc::Receiver<Item>) -> Self {
ElementSrcTestTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
receiver,
}
}
}
impl ElementSrcTestTask {
fn flush(&mut self) {
// Purge the channel
while let Ok(Some(_item)) = self.receiver.try_next() {}
}
}
impl TaskImpl for ElementSrcTestTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.receiver.next().await;
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
return Err(gst::FlowError::Eos);
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let res = PadSrcTestHandler::push_item(&pad, item).await;
match res {
Ok(_) => gst_log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
Err(gst::FlowError::Flushing) => {
gst_debug!(SRC_CAT, obj: &self.element, "Flushing")
}
Err(err) => panic!("Got error {}", err),
}
res.map(drop)
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
self.flush();
gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
self.flush();
gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
}
.boxed()
}
}
#[derive(Debug)]
struct ElementSrcTest {
src_pad: PadSrc,
task: Task,
state: StdMutex<ElementSrcTestState>,
sender: StdMutex<Option<mpsc::Sender<Item>>>,
receiver: StdMutex<Option<Arc<FutMutex<mpsc::Receiver<Item>>>>>,
settings: StdMutex<Settings>,
}
impl ElementSrcTest {
fn try_push(&self, item: Item) -> Result<(), Item> {
let state = self.state.lock().unwrap();
if *state == ElementSrcTestState::RejectItems {
let state = self.task.lock_state();
if *state != TaskState::Started && *state != TaskState::Paused {
gst_debug!(SRC_CAT, "ElementSrcTest rejecting item due to pad state");
return Err(item);
@ -177,162 +243,51 @@ impl ElementSrcTest {
)
})?;
self.task.prepare(context).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
let (sender, receiver) = mpsc::channel(1);
*self.sender.lock().unwrap() = Some(sender);
*self.receiver.lock().unwrap() = Some(Arc::new(FutMutex::new(receiver)));
self.task
.prepare(
ElementSrcTestTask::new(element, &self.src_pad, receiver),
context,
)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Error preparing Task: {:?}", err]
)
})?;
gst_debug!(SRC_CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Unpreparing");
*self.sender.lock().unwrap() = None;
self.task.unprepare().unwrap();
*self.sender.lock().unwrap() = None;
*self.receiver.lock().unwrap() = None;
gst_debug!(SRC_CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
fn stop(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Stopping");
self.flush(element);
*state = ElementSrcTestState::RejectItems;
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
fn flush(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Flushing");
self.task.stop();
let receiver = self.receiver.lock().unwrap();
let mut receiver = receiver
.as_ref()
.unwrap()
.try_lock()
.expect("receiver locked elsewhere");
// Purge the channel
loop {
match receiver.try_next() {
Ok(Some(_item)) => {
gst_debug!(SRC_CAT, obj: element, "Dropping pending item");
}
Err(_) => {
gst_debug!(SRC_CAT, obj: element, "No more pending item");
break;
}
Ok(None) => {
panic!("Channel sender dropped");
}
}
}
gst_debug!(SRC_CAT, obj: element, "Flushed");
gst_debug!(SRC_CAT, obj: element, "Stopped");
}
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
if *state == ElementSrcTestState::Started {
gst_debug!(SRC_CAT, obj: element, "Already started");
return Err(());
}
fn start(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Starting");
self.start_task();
*state = ElementSrcTestState::Started;
self.task.start();
gst_debug!(SRC_CAT, obj: element, "Started");
Ok(())
}
fn start_task(&self) {
let pad_weak = self.src_pad.downgrade();
let receiver = Arc::clone(self.receiver.lock().unwrap().as_ref().expect("No receiver"));
self.task.start(move || {
let pad_weak = pad_weak.clone();
let receiver = Arc::clone(&receiver);
async move {
let item = receiver.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let item = match item {
Some(item) => item,
None => {
gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted");
return glib::Continue(false);
}
};
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
match PadSrcTestHandler::push_item(pad, item).await {
Ok(_) => glib::Continue(true),
Err(gst::FlowError::Flushing) => glib::Continue(false),
Err(err) => panic!("Got error {:?}", err),
}
}
});
}
fn flush_stop(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
if *state == ElementSrcTestState::Started {
gst_debug!(SRC_CAT, obj: element, "Already started");
return;
}
gst_debug!(SRC_CAT, obj: element, "Stopping Flush");
self.flush(element);
self.start_task();
*state = ElementSrcTestState::Started;
gst_debug!(SRC_CAT, obj: element, "Stopped Flush");
}
fn flush_start(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
gst_debug!(SRC_CAT, obj: element, "Starting Flush");
self.task.cancel();
*state = ElementSrcTestState::RejectItems;
gst_debug!(SRC_CAT, obj: element, "Flush Started");
}
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
fn pause(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Pausing");
self.task.pause();
*state = ElementSrcTestState::Paused;
gst_debug!(SRC_CAT, obj: element, "Paused");
Ok(())
}
}
@ -372,9 +327,7 @@ impl ObjectSubclass for ElementSrcTest {
PadSrcTestHandler,
),
task: Task::default(),
state: StdMutex::new(ElementSrcTestState::RejectItems),
sender: StdMutex::new(None),
receiver: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}
@ -423,10 +376,10 @@ impl ElementImpl for ElementSrcTest {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element).map_err(|_| gst::StateChangeError)?;
self.pause(element);
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
self.unprepare(element);
}
_ => (),
}
@ -435,10 +388,10 @@ impl ElementImpl for ElementSrcTest {
match transition {
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
self.stop(element);
}
gst::StateChange::PausedToPlaying => {
self.start(element).map_err(|_| gst::StateChangeError)?;
self.start(element);
}
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
@ -449,13 +402,13 @@ impl ElementImpl for ElementSrcTest {
Ok(success)
}
fn send_event(&self, element: &gst::Element, event: gst::Event) -> bool {
fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
match event.view() {
EventView::FlushStart(..) => {
self.flush_start(element);
self.task.flush_start();
}
EventView::FlushStop(..) => {
self.flush_stop(element);
self.task.flush_stop();
}
_ => (),
}

View file

@ -406,12 +406,15 @@ fn eos() {
eos_notif_rcv.recv().unwrap();
assert!(push_buffer(&src));
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(
sample_notif_rcv.try_recv().unwrap_err(),
mpsc::TryRecvError::Empty
);
// FIXME not ideal, but better than previous approach.
// I think the "end-of-stream" signal should block
// until the **src** element has actually reached EOS
loop {
std::thread::sleep(std::time::Duration::from_millis(10));
if !push_buffer(&src) {
break;
}
}
pipeline_clone.set_state(gst::State::Null).unwrap();
l_clone.quit();