ts/appsrc: reduce sync primitives in async hot path

This commit is contained in:
François Laignel 2022-06-25 15:13:58 +02:00 committed by Sebastian Dröge
parent a1b89c1fb9
commit 5720faa808

View file

@ -1,5 +1,5 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
@ -20,7 +20,6 @@
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
use gst::glib;
@ -29,13 +28,12 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::Mutex;
use std::time::Duration;
use std::u32;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@ -78,108 +76,8 @@ enum StreamItem {
Event(gst::Event),
}
#[derive(Debug)]
struct AppSrcPadHandlerState {
need_initial_events: bool,
need_segment: bool,
caps: Option<gst::Caps>,
}
impl Default for AppSrcPadHandlerState {
fn default() -> Self {
AppSrcPadHandlerState {
need_initial_events: true,
need_segment: true,
caps: None,
}
}
}
#[derive(Debug, Default)]
struct AppSrcPadHandlerInner {
state: FutMutex<AppSrcPadHandlerState>,
configured_caps: StdMutex<Option<gst::Caps>>,
}
#[derive(Clone, Debug, Default)]
struct AppSrcPadHandler(Arc<AppSrcPadHandlerInner>);
impl AppSrcPadHandler {
fn prepare(&self, caps: Option<gst::Caps>) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.caps = caps;
}
async fn reset_state(&self) {
*self.0.state.lock().await = Default::default();
}
async fn set_need_segment(&self) {
self.0.state.lock().await.need_segment = true;
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::AppSrc) {
let mut state = self.0.state.lock().await;
if state.need_initial_events {
gst::debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
pad.push_event(stream_start_evt).await;
if let Some(ref caps) = state.caps {
pad.push_event(gst::event::Caps::new(caps)).await;
*self.0.configured_caps.lock().unwrap() = Some(caps.clone());
}
state.need_initial_events = false;
}
if state.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
pad.push_event(segment_evt).await;
state.need_segment = false;
}
}
async fn push_item(
&self,
pad: &PadSrcRef<'_>,
element: &super::AppSrc,
item: StreamItem,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", item);
self.push_prelude(pad, element).await;
match item {
StreamItem::Buffer(buffer) => {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
pad.push(buffer).await
}
StreamItem::Event(event) => {
match event.view() {
gst::EventView::Eos(_) => {
// Let the caller push the event
Err(gst::FlowError::Eos)
}
_ => {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
}
}
}
}
}
}
#[derive(Clone, Debug)]
struct AppSrcPadHandler;
impl PadSrcHandler for AppSrcPadHandler {
type ElementImpl = AppSrc;
@ -215,7 +113,7 @@ impl PadSrcHandler for AppSrcPadHandler {
fn src_query(
&self,
pad: &PadSrcRef,
_appsrc: &AppSrc,
appsrc: &AppSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
@ -233,7 +131,7 @@ impl PadSrcHandler for AppSrcPadHandler {
true
}
QueryViewMut::Caps(q) => {
let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() {
let caps = if let Some(caps) = appsrc.configured_caps.lock().unwrap().as_ref() {
q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone())
@ -262,23 +160,18 @@ impl PadSrcHandler for AppSrcPadHandler {
#[derive(Debug)]
struct AppSrcTask {
element: super::AppSrc,
src_pad: PadSrcWeak,
src_pad_handler: AppSrcPadHandler,
receiver: mpsc::Receiver<StreamItem>,
need_initial_events: bool,
need_segment: bool,
}
impl AppSrcTask {
fn new(
element: &super::AppSrc,
src_pad: &PadSrc,
src_pad_handler: &AppSrcPadHandler,
receiver: mpsc::Receiver<StreamItem>,
) -> Self {
fn new(element: super::AppSrc, receiver: mpsc::Receiver<StreamItem>) -> Self {
AppSrcTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
src_pad_handler: src_pad_handler.clone(),
element,
receiver,
need_initial_events: true,
need_segment: true,
}
}
}
@ -288,37 +181,85 @@ impl AppSrcTask {
// Purge the channel
while let Ok(Some(_item)) = self.receiver.try_next() {}
}
async fn push_item(&mut self, item: StreamItem) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: &self.element, "Handling {:?}", item);
let appsrc = self.element.imp();
if self.need_initial_events {
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
appsrc.src_pad.push_event(stream_start_evt).await;
let caps = appsrc.settings.lock().unwrap().caps.clone();
if let Some(caps) = caps {
appsrc
.src_pad
.push_event(gst::event::Caps::new(&caps))
.await;
*appsrc.configured_caps.lock().unwrap() = Some(caps.clone());
}
self.need_initial_events = false;
}
if self.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
appsrc.src_pad.push_event(segment_evt).await;
self.need_segment = false;
}
match item {
StreamItem::Buffer(buffer) => {
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
appsrc.src_pad.push(buffer).await
}
StreamItem::Event(event) => {
match event.view() {
gst::EventView::Eos(_) => {
// Let the caller push the event
Err(gst::FlowError::Eos)
}
_ => {
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", event);
appsrc.src_pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
}
}
}
}
}
}
impl TaskImpl for AppSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = match self.receiver.next().await {
Some(item) => item,
None => {
gst::error!(CAT, obj: &self.element, "SrcPad channel aborted");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason: channel aborted"]
);
return Err(gst::FlowError::Flushing);
}
};
let item = self.receiver.next().await.ok_or_else(|| {
gst::error!(CAT, obj: &self.element, "SrcPad channel aborted");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason: channel aborted"]
);
gst::FlowError::Flushing
})?;
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let res = self
.src_pad_handler
.push_item(&pad, &self.element, item)
.await;
let res = self.push_item(item).await;
match res {
Ok(_) => {
gst::log!(CAT, obj: &self.element, "Successfully pushed item");
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS");
pad.push_event(gst::event::Eos::new()).await;
let appsrc = self.element.imp();
appsrc.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj: &self.element, "Flushing");
@ -344,7 +285,8 @@ impl TaskImpl for AppSrcTask {
gst::log!(CAT, obj: &self.element, "Stopping task");
self.flush();
self.src_pad_handler.reset_state().await;
self.need_initial_events = true;
self.need_segment = true;
gst::log!(CAT, obj: &self.element, "Task stopped");
Ok(())
@ -357,7 +299,7 @@ impl TaskImpl for AppSrcTask {
gst::log!(CAT, obj: &self.element, "Starting task flush");
self.flush();
self.src_pad_handler.set_need_segment().await;
self.need_segment = true;
gst::log!(CAT, obj: &self.element, "Task flush started");
Ok(())
@ -369,10 +311,10 @@ impl TaskImpl for AppSrcTask {
#[derive(Debug)]
pub struct AppSrc {
src_pad: PadSrc,
src_pad_handler: AppSrcPadHandler,
task: Task,
sender: StdMutex<Option<mpsc::Sender<StreamItem>>>,
settings: StdMutex<Settings>,
sender: Mutex<Option<mpsc::Sender<StreamItem>>>,
configured_caps: Mutex<Option<gst::Caps>>,
settings: Mutex<Settings>,
}
impl AppSrc {
@ -429,9 +371,9 @@ impl AppSrc {
}
fn prepare(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
gst::debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap();
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst::error_msg!(
@ -439,24 +381,21 @@ impl AppSrc {
["Failed to acquire Context: {}", err]
)
})?;
let max_buffers = settings.max_buffers.try_into().map_err(|err| {
gst::error_msg!(
gst::ResourceError::Settings,
["Invalid max-buffers: {}, {}", settings.max_buffers, err]
)
})?;
drop(settings);
*self.configured_caps.lock().unwrap() = None;
let (sender, receiver) = mpsc::channel(max_buffers);
*self.sender.lock().unwrap() = Some(sender);
self.src_pad_handler.prepare(settings.caps.clone());
self.task
.prepare(
AppSrcTask::new(element, &self.src_pad, &self.src_pad_handler, receiver),
context,
)
.prepare(AppSrcTask::new(element.clone(), receiver), context)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
@ -507,17 +446,15 @@ impl ObjectSubclass for AppSrc {
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let src_pad_handler = AppSrcPadHandler::default();
Self {
src_pad: PadSrc::new(
gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")),
src_pad_handler.clone(),
AppSrcPadHandler,
),
src_pad_handler,
task: Task::default(),
sender: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
sender: Default::default(),
configured_caps: Default::default(),
settings: Default::default(),
}
}
}