ts: migrate elements to try_next / handle_item

See previous commit for details.

Also switched to panicking for some programming errors.
This commit is contained in:
François Laignel 2022-08-10 20:10:08 +02:00
parent 8b54c3fed6
commit 33e601d33e
9 changed files with 224 additions and 183 deletions

View file

@ -238,19 +238,20 @@ impl AppSrcTask {
}
impl TaskImpl for AppSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.receiver.next().await.ok_or_else(|| {
gst::error!(CAT, obj: &self.element, "SrcPad channel aborted");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason: channel aborted"]
);
gst::FlowError::Flushing
})?;
type Item = StreamItem;
fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
async move {
self.receiver
.next()
.await
.ok_or_else(|| panic!("Internal channel sender dropped while Task is Started"))
}
.boxed()
}
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let res = self.push_item(item).await;
match res {
Ok(_) => {
@ -428,7 +429,7 @@ impl AppSrc {
fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
let _ = self.task.pause().check()?;
self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}

View file

@ -1066,6 +1066,8 @@ impl JitterBufferTask {
}
impl TaskImpl for JitterBufferTask {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Starting task");
@ -1087,7 +1089,17 @@ impl TaskImpl for JitterBufferTask {
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
// FIXME this function was migrated to the try_next / handle_item model
// but hasn't been touched as there are pending changes to jitterbuffer
// in https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/756.
// It should be possible to remove the loop below as try_next / handle_item
// are executed in a loop by the Task state machine.
// It should also be possible to store latency and context_wait as
// fields of JitterBufferTask so as to avoid locking the settings.
// If latency can change during processing, a command based mechanism
// could be implemented. See the command implemention for ts-udpsink as
// an example.
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let jb = self.element.imp();
let (latency, context_wait) = {
@ -1223,6 +1235,10 @@ impl TaskImpl for JitterBufferTask {
.boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::ok(()).boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");

View file

@ -858,6 +858,8 @@ impl ProxySrcTask {
}
impl TaskImpl for ProxySrcTask {
type Item = DataQueueItem;
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj: &self.element, "Starting task");
@ -880,18 +882,18 @@ impl TaskImpl for ProxySrcTask {
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn try_next(&mut self) -> BoxFuture<'_, Result<DataQueueItem, gst::FlowError>> {
async move {
let item = self.dataqueue.next().await;
let item = match item {
Some(item) => item,
None => {
gst::log!(SRC_CAT, obj: &self.element, "DataQueue Stopped");
return Err(gst::FlowError::Flushing);
}
};
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
.boxed()
}
fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let res = self.push_item(item).await;
let proxysrc = self.element.imp();
match res {
@ -1085,7 +1087,7 @@ impl ProxySrc {
fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(SRC_CAT, obj: element, "Pausing");
let _ = self.task.pause().check()?;
self.task.pause().block_on()?;
gst::debug!(SRC_CAT, obj: element, "Paused");
Ok(())
}

View file

@ -328,6 +328,8 @@ impl QueueTask {
}
impl TaskImpl for QueueTask {
type Item = DataQueueItem;
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Starting task");
@ -345,18 +347,18 @@ impl TaskImpl for QueueTask {
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn try_next(&mut self) -> BoxFuture<'_, Result<DataQueueItem, gst::FlowError>> {
async move {
let item = self.dataqueue.next().await;
let item = match item {
Some(item) => item,
None => {
gst::log!(CAT, obj: &self.element, "DataQueue Stopped");
return Err(gst::FlowError::Flushing);
}
};
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
.boxed()
}
fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async {
let res = self.push_item(item).await;
let queue = self.element.imp();
match res {

View file

@ -117,13 +117,13 @@ impl fmt::Display for SocketError {
}
}
pub type SocketStreamItem = Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError>;
impl<T: SocketRead> Socket<T> {
// Can't implement this as a Stream trait because we end up using things like
// tokio::net::UdpSocket which don't implement pollable functions.
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<SocketStreamItem> {
pub async fn try_next(
&mut self,
) -> Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError> {
gst::log!(SOCKET_CAT, obj: &self.element, "Trying to read data");
if self.mapped_buffer.is_none() {
@ -133,7 +133,7 @@ impl<T: SocketRead> Socket<T> {
}
Err(err) => {
gst::debug!(SOCKET_CAT, obj: &self.element, "Failed to acquire buffer {:?}", err);
return Some(Err(SocketError::Gst(err)));
return Err(SocketError::Gst(err));
}
}
}
@ -172,12 +172,12 @@ impl<T: SocketRead> Socket<T> {
buffer.set_dts(dts);
}
Some(Ok((buffer, saddr)))
Ok((buffer, saddr))
}
Err(err) => {
gst::debug!(SOCKET_CAT, obj: &self.element, "Read error {:?}", err);
Some(Err(SocketError::Io(err)))
Err(SocketError::Io(err))
}
}
}

View file

@ -278,6 +278,8 @@ impl TcpClientSrcTask {
}
impl TaskImpl for TcpClientSrcTask {
type Item = gst::Buffer;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr);
@ -331,41 +333,44 @@ impl TaskImpl for TcpClientSrcTask {
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
gst::FlowError::Flushing
})?;
let (buffer, _) = item.map_err(|err| {
gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
self.socket
.as_mut()
.unwrap()
.try_next()
.await
.map(|(buffer, _saddr)| buffer)
.map_err(|err| {
gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
gst::FlowError::Error
})?;
self.push_buffer(buffer).await.map(drop)
gst::FlowError::Error
})
}
.boxed()
}
fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
self.push_buffer(buffer).map_ok(drop).boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj: &self.element, "Stopping task");
@ -486,7 +491,7 @@ impl TcpClientSrc {
fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
let _ = self.task.pause().check()?;
self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}

View file

@ -691,6 +691,8 @@ impl UdpSinkTask {
}
impl TaskImpl for UdpSinkTask {
type Item = TaskItem;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::info!(CAT, obj: &self.element, "Preparing Task");
@ -725,17 +727,25 @@ impl TaskImpl for UdpSinkTask {
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
async move {
let item = futures::select_biased! {
cmd = self.cmd_receiver.recv_async() => {
self.process_command(cmd.unwrap());
return Ok(());
loop {
futures::select_biased! {
cmd = self.cmd_receiver.recv_async() => {
self.process_command(cmd.unwrap());
}
item = self.item_receiver.recv_async() => {
break item.map_err(|_| panic!("Internal channel sender dropped while Task is Started"))
}
}
item = self.item_receiver.recv_async() => item,
};
}
}
.boxed()
}
match item.map_err(|_| gst::FlowError::Flushing)? {
fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
match item {
TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| {
element_error!(
&self.element,

View file

@ -208,67 +208,11 @@ impl UdpSrcTask {
need_segment: true,
}
}
async fn push_buffer(
&mut self,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
let udpsrc = self.element.imp();
if self.need_initial_events {
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
udpsrc.src_pad.push_event(stream_start_evt).await;
let caps = udpsrc.settings.lock().unwrap().caps.clone();
if let Some(caps) = caps {
udpsrc
.src_pad
.push_event(gst::event::Caps::new(&caps))
.await;
*udpsrc.configured_caps.lock().unwrap() = Some(caps);
}
self.need_initial_events = false;
}
if self.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
udpsrc.src_pad.push_event(segment_evt).await;
self.need_segment = false;
}
let res = udpsrc.src_pad.push(buffer).await;
match res {
Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS");
udpsrc.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
res
}
}
impl TaskImpl for UdpSrcTask {
type Item = gst::Buffer;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
let udpsrc = self.element.imp();
@ -489,46 +433,105 @@ impl TaskImpl for UdpSrcTask {
.boxed()
}
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| {
gst::log!(CAT, obj: &self.element, "SocketStream Stopped");
gst::FlowError::Flushing
})?;
self.socket
.as_mut()
.unwrap()
.try_next()
.await
.map(|(mut buffer, saddr)| {
if let Some(saddr) = saddr {
if self.retrieve_sender_address {
NetAddressMeta::add(
buffer.get_mut().unwrap(),
&gio::InetSocketAddress::from(saddr),
);
}
}
buffer
})
.map_err(|err| {
gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
}
gst::FlowError::Error
})
}
.boxed()
}
let (mut buffer, saddr) = item.map_err(|err| {
gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async {
gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
let udpsrc = self.element.imp();
if self.need_initial_events {
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
let stream_id =
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
udpsrc.src_pad.push_event(stream_start_evt).await;
let caps = udpsrc.settings.lock().unwrap().caps.clone();
if let Some(caps) = caps {
udpsrc
.src_pad
.push_event(gst::event::Caps::new(&caps))
.await;
*udpsrc.configured_caps.lock().unwrap() = Some(caps);
}
gst::FlowError::Error
})?;
if let Some(saddr) = saddr {
if self.retrieve_sender_address {
NetAddressMeta::add(
buffer.get_mut().unwrap(),
&gio::InetSocketAddress::from(saddr),
self.need_initial_events = false;
}
if self.need_segment {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
udpsrc.src_pad.push_event(segment_evt).await;
self.need_segment = false;
}
let res = udpsrc.src_pad.push(buffer).await.map(drop);
match res {
Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj: &self.element, "EOS");
udpsrc.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(err) => {
gst::error!(CAT, obj: &self.element, "Got error {}", err);
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
}
self.push_buffer(buffer).await.map(drop)
res
}
.boxed()
}
@ -616,7 +619,7 @@ impl UdpSrc {
fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Pausing");
let _ = self.task.pause().check()?;
self.task.pause().block_on()?;
gst::debug!(CAT, obj: element, "Paused");
Ok(())
}

View file

@ -155,19 +155,21 @@ mod imp_src {
}
impl TaskImpl for ElementSrcTestTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
type Item = Item;
fn try_next(&mut self) -> BoxFuture<'_, Result<Item, gst::FlowError>> {
async move {
let item = self.receiver.next().await;
self.receiver.next().await.ok_or_else(|| {
gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
gst::FlowError::Eos
})
}
.boxed()
}
let item = match item {
Some(item) => item,
None => {
gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
return Err(gst::FlowError::Eos);
}
};
let res = self.push_item(item).await;
fn handle_item(&mut self, item: Item) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let res = self.push_item(item).await.map(drop);
match res {
Ok(_) => gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
Err(gst::FlowError::Flushing) => {
@ -176,7 +178,7 @@ mod imp_src {
Err(err) => panic!("Got error {}", err),
}
res.map(drop)
res
}
.boxed()
}
@ -275,7 +277,7 @@ mod imp_src {
fn pause(&self, element: &super::ElementSrcTest) {
gst::debug!(SRC_CAT, obj: element, "Pausing");
let _ = self.task.pause().check().unwrap();
self.task.pause().block_on().unwrap();
gst::debug!(SRC_CAT, obj: element, "Paused");
}
}