ts/tcpclientsrc: reduce sync primitives in async hot path

This commit is contained in:
François Laignel 2022-06-26 22:00:01 +02:00 committed by Sebastian Dröge
parent 7e826385c7
commit d6a9106ffa

View file

@ -19,7 +19,6 @@
// SPDX-License-Identifier: LGPL-2.1-or-later // SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*; use futures::prelude::*;
use gst::glib; use gst::glib;
@ -30,15 +29,14 @@ use once_cell::sync::Lazy;
use std::io; use std::io;
use std::net::{IpAddr, SocketAddr, TcpStream}; use std::net::{IpAddr, SocketAddr, TcpStream};
use std::sync::Arc; use std::sync::Mutex;
use std::sync::Mutex as StdMutex;
use std::time::Duration; use std::time::Duration;
use std::u16; use std::u16;
use std::u32; use std::u32;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::task; use crate::runtime::task;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState};
use crate::runtime::Async; use crate::runtime::Async;
use crate::socket::{Socket, SocketError, SocketRead}; use crate::socket::{Socket, SocketError, SocketRead};
@ -92,95 +90,8 @@ impl SocketRead for TcpClientReader {
} }
} }
#[derive(Debug)] #[derive(Clone, Debug)]
struct TcpClientSrcPadHandlerState { struct TcpClientSrcPadHandler;
need_initial_events: bool,
need_segment: bool,
caps: Option<gst::Caps>,
}
impl Default for TcpClientSrcPadHandlerState {
fn default() -> Self {
TcpClientSrcPadHandlerState {
need_initial_events: true,
need_segment: true,
caps: None,
}
}
}
#[derive(Debug, Default)]
struct TcpClientSrcPadHandlerInner {
state: FutMutex<TcpClientSrcPadHandlerState>,
configured_caps: StdMutex<Option<gst::Caps>>,
}
#[derive(Clone, Debug, Default)]
struct TcpClientSrcPadHandler(Arc<TcpClientSrcPadHandlerInner>);
impl TcpClientSrcPadHandler {
fn prepare(&self, caps: Option<gst::Caps>) {
self.0
.state
.try_lock()
.expect("State locked elsewhere")
.caps = caps;
}
async fn reset_state(&self) {
*self.0.configured_caps.lock().unwrap() = None;
}
async fn set_need_segment(&self) {
self.0.state.lock().await.need_segment = true;
}
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::TcpClientSrc) {
let mut state = self.0.state.lock().await;
if state.need_initial_events {
gst::debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
pad.push_event(stream_start_evt).await;
if let Some(ref caps) = state.caps {
pad.push_event(gst::event::Caps::new(caps)).await;
*self.0.configured_caps.lock().unwrap() = Some(caps.clone());
}
state.need_initial_events = false;
}
if state.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
pad.push_event(segment_evt).await;
state.need_segment = false;
}
}
async fn push_buffer(
&self,
pad: &PadSrcRef<'_>,
element: &super::TcpClientSrc,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
self.push_prelude(pad, element).await;
if buffer.size() == 0 {
pad.push_event(gst::event::Eos::new()).await;
return Ok(gst::FlowSuccess::Ok);
}
pad.push(buffer).await
}
}
impl PadSrcHandler for TcpClientSrcPadHandler { impl PadSrcHandler for TcpClientSrcPadHandler {
type ElementImpl = TcpClientSrc; type ElementImpl = TcpClientSrc;
@ -216,7 +127,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
fn src_query( fn src_query(
&self, &self,
pad: &PadSrcRef, pad: &PadSrcRef,
_tcpclientsrc: &TcpClientSrc, tcpclientsrc: &TcpClientSrc,
_element: &gst::Element, _element: &gst::Element,
query: &mut gst::QueryRef, query: &mut gst::QueryRef,
) -> bool { ) -> bool {
@ -234,7 +145,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
true true
} }
QueryViewMut::Caps(q) => { QueryViewMut::Caps(q) => {
let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { let caps = if let Some(caps) = tcpclientsrc.configured_caps.lock().unwrap().as_ref()
{
q.filter() q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone()) .unwrap_or_else(|| caps.clone())
@ -263,30 +175,98 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
struct TcpClientSrcTask { struct TcpClientSrcTask {
element: super::TcpClientSrc, element: super::TcpClientSrc,
src_pad: PadSrcWeak,
src_pad_handler: TcpClientSrcPadHandler,
saddr: SocketAddr, saddr: SocketAddr,
buffer_pool: Option<gst::BufferPool>, buffer_pool: Option<gst::BufferPool>,
socket: Option<Socket<TcpClientReader>>, socket: Option<Socket<TcpClientReader>>,
need_initial_events: bool,
need_segment: bool,
} }
impl TcpClientSrcTask { impl TcpClientSrcTask {
fn new( fn new(element: super::TcpClientSrc, saddr: SocketAddr, buffer_pool: gst::BufferPool) -> Self {
element: &super::TcpClientSrc,
src_pad: &PadSrc,
src_pad_handler: &TcpClientSrcPadHandler,
saddr: SocketAddr,
buffer_pool: gst::BufferPool,
) -> Self {
TcpClientSrcTask { TcpClientSrcTask {
element: element.clone(), element,
src_pad: src_pad.downgrade(),
src_pad_handler: src_pad_handler.clone(),
saddr, saddr,
buffer_pool: Some(buffer_pool), buffer_pool: Some(buffer_pool),
socket: None, socket: None,
need_initial_events: true,
need_segment: true,
} }
} }
async fn push_buffer(
&mut self,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
let tcpclientsrc = self.element.imp();
if self.need_initial_events {
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
tcpclientsrc.src_pad.push_event(stream_start_evt).await;
let caps = tcpclientsrc.settings.lock().unwrap().caps.clone();
if let Some(caps) = caps {
tcpclientsrc
.src_pad
.push_event(gst::event::Caps::new(&caps))
.await;
*tcpclientsrc.configured_caps.lock().unwrap() = Some(caps);
}
self.need_initial_events = false;
}
if self.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
tcpclientsrc.src_pad.push_event(segment_evt).await;
self.need_segment = false;
}
if buffer.size() == 0 {
tcpclientsrc
.src_pad
.push_event(gst::event::Eos::new())
.await;
return Ok(gst::FlowSuccess::Ok);
}
let res = tcpclientsrc.src_pad.push(buffer).await;
match res {
Ok(_) => {
gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj: &self.element, "Flushing");
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS");
tcpclientsrc
.src_pad
.push_event(gst::event::Eos::new())
.await;
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
res
}
} }
impl TaskImpl for TcpClientSrcTask { impl TaskImpl for TcpClientSrcTask {
@ -345,66 +325,35 @@ impl TaskImpl for TcpClientSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move { async move {
let item = self.socket.as_mut().unwrap().next().await; let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
gst::FlowError::Flushing
})?;
let buffer = match item { let (buffer, _) = item.map_err(|err| {
Some(Ok((buffer, _))) => buffer, gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
Some(Err(err)) => { match err {
gst::error!(CAT, obj: &self.element, "Got error {:?}", err); SocketError::Gst(err) => {
match err { gst::element_error!(
SocketError::Gst(err) => { self.element,
gst::element_error!( gst::StreamError::Failed,
self.element, ("Internal data stream error"),
gst::StreamError::Failed, ["streaming stopped, reason {}", err]
("Internal data stream error"), );
["streaming stopped, reason {}", err] }
); SocketError::Io(err) => {
} gst::element_error!(
SocketError::Io(err) => { self.element,
gst::element_error!( gst::StreamError::Failed,
self.element, ("I/O error"),
gst::StreamError::Failed, ["streaming stopped, I/O error {}", err]
("I/O error"), );
["streaming stopped, I/O error {}", err]
);
}
} }
return Err(gst::FlowError::Error);
} }
None => { gst::FlowError::Error
gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); })?;
return Err(gst::FlowError::Flushing);
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); self.push_buffer(buffer).await.map(drop)
let res = self
.src_pad_handler
.push_buffer(&pad, &self.element, buffer)
.await;
match res {
Ok(_) => {
gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj: &self.element, "Flushing");
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS");
pad.push_event(gst::event::Eos::new()).await;
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
res.map(drop)
} }
.boxed() .boxed()
} }
@ -412,7 +361,7 @@ impl TaskImpl for TcpClientSrcTask {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move { async move {
gst::log!(CAT, obj: &self.element, "Stopping task"); gst::log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await; self.need_initial_events = true;
gst::log!(CAT, obj: &self.element, "Task stopped"); gst::log!(CAT, obj: &self.element, "Task stopped");
Ok(()) Ok(())
} }
@ -422,7 +371,7 @@ impl TaskImpl for TcpClientSrcTask {
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move { async move {
gst::log!(CAT, obj: &self.element, "Stopping task flush"); gst::log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await; self.need_initial_events = true;
gst::log!(CAT, obj: &self.element, "Task flush stopped"); gst::log!(CAT, obj: &self.element, "Task flush stopped");
Ok(()) Ok(())
} }
@ -432,9 +381,9 @@ impl TaskImpl for TcpClientSrcTask {
pub struct TcpClientSrc { pub struct TcpClientSrc {
src_pad: PadSrc, src_pad: PadSrc,
src_pad_handler: TcpClientSrcPadHandler,
task: Task, task: Task,
settings: StdMutex<Settings>, configured_caps: Mutex<Option<gst::Caps>>,
settings: Mutex<Settings>,
} }
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -447,9 +396,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
impl TcpClientSrc { impl TcpClientSrc {
fn prepare(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap().clone();
gst::debug!(CAT, obj: element, "Preparing"); gst::debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().unwrap().clone();
let context = let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| { Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
@ -459,6 +407,8 @@ impl TcpClientSrc {
) )
})?; })?;
*self.configured_caps.lock().unwrap() = None;
let host: IpAddr = match settings.host { let host: IpAddr = match settings.host {
None => { None => {
return Err(gst::error_msg!( return Err(gst::error_msg!(
@ -490,17 +440,9 @@ impl TcpClientSrc {
let saddr = SocketAddr::new(host, port as u16); let saddr = SocketAddr::new(host, port as u16);
self.src_pad_handler.prepare(settings.caps);
self.task self.task
.prepare( .prepare(
TcpClientSrcTask::new( TcpClientSrcTask::new(element.clone(), saddr, buffer_pool),
element,
&self.src_pad,
&self.src_pad_handler,
saddr,
buffer_pool,
),
context, context,
) )
.map_err(|err| { .map_err(|err| {
@ -550,16 +492,14 @@ impl ObjectSubclass for TcpClientSrc {
type ParentType = gst::Element; type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self { fn with_class(klass: &Self::Class) -> Self {
let src_pad_handler = TcpClientSrcPadHandler::default();
Self { Self {
src_pad: PadSrc::new( src_pad: PadSrc::new(
gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")),
src_pad_handler.clone(), TcpClientSrcPadHandler,
), ),
src_pad_handler,
task: Task::default(), task: Task::default(),
settings: StdMutex::new(Settings::default()), configured_caps: Default::default(),
settings: Default::default(),
} }
} }
} }