threadshare: Handle end of stream for sources

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1581>
This commit is contained in:
Sanchayan Maity 2024-05-20 15:43:59 +05:30
parent 6538803cf6
commit cd47bf2f04
2 changed files with 284 additions and 66 deletions

View file

@ -40,6 +40,8 @@ use crate::runtime::{Context, PadSrc, Task, TaskState};
use crate::runtime::Async;
use crate::socket::{Socket, SocketError, SocketRead};
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::pin_mut;
const DEFAULT_HOST: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: i32 = 4953;
@ -48,6 +50,11 @@ const DEFAULT_BLOCKSIZE: u32 = 4096;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
#[derive(Debug, Default)]
struct State {
event_sender: Option<Sender<gst::Event>>,
}
#[derive(Debug, Clone)]
struct Settings {
host: Option<String>,
@ -166,10 +173,16 @@ struct TcpClientSrcTask {
socket: Option<Socket<TcpClientReader>>,
need_initial_events: bool,
need_segment: bool,
event_receiver: Receiver<gst::Event>,
}
impl TcpClientSrcTask {
fn new(element: super::TcpClientSrc, saddr: SocketAddr, buffer_pool: gst::BufferPool) -> Self {
fn new(
element: super::TcpClientSrc,
saddr: SocketAddr,
buffer_pool: gst::BufferPool,
event_receiver: Receiver<gst::Event>,
) -> Self {
TcpClientSrcTask {
element,
saddr,
@ -177,6 +190,7 @@ impl TcpClientSrcTask {
socket: None,
need_initial_events: true,
need_segment: true,
event_receiver,
}
}
@ -313,34 +327,58 @@ impl TaskImpl for TcpClientSrcTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
self.socket
.as_mut()
.unwrap()
.try_next()
.await
.map(|(buffer, _saddr)| buffer)
.map_err(|err| {
gst::error!(CAT, obj: self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
let event_fut = self.event_receiver.next().fuse();
let socket_fut = self.socket.as_mut().unwrap().try_next().fuse();
pin_mut!(event_fut);
pin_mut!(socket_fut);
futures::select! {
event_res = event_fut => match event_res {
Some(event) => {
gst::debug!(CAT, obj: self.element, "Handling element level event {event:?}");
match event.view() {
gst::EventView::Eos(_) => Err(gst::FlowError::Eos),
ev => {
gst::error!(CAT, obj: self.element, "Unexpected event {ev:?} on channel");
Err(gst::FlowError::Error)
}
}
}
gst::FlowError::Error
})
None => {
gst::error!(CAT, obj: self.element, "Unexpected return on event channel");
Err(gst::FlowError::Error)
}
},
socket_res = socket_fut => match socket_res {
Ok((buffer, _saddr)) => Ok(buffer),
Err(err) => {
gst::error!(CAT, obj: self.element, "Got error {err:#}");
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {err}"]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {err}"]
);
}
}
Err(gst::FlowError::Error)
}
},
}
}
.boxed()
}
@ -368,6 +406,40 @@ impl TaskImpl for TcpClientSrcTask {
}
.boxed()
}
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
match err {
gst::FlowError::Flushing => {
gst::debug!(CAT, obj: self.element, "Flushing");
task::Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(CAT, obj: self.element, "EOS");
self.element
.imp()
.src_pad
.push_event(gst::event::Eos::new())
.await;
task::Trigger::Stop
}
err => {
gst::error!(CAT, obj: self.element, "Got error {err}");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
}
}
.boxed()
}
}
pub struct TcpClientSrc {
@ -375,6 +447,7 @@ pub struct TcpClientSrc {
task: Task,
configured_caps: Mutex<Option<gst::Caps>>,
settings: Mutex<Settings>,
state: Mutex<State>,
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -431,18 +504,24 @@ impl TcpClientSrc {
let saddr = SocketAddr::new(host, port as u16);
let (sender, receiver) = channel(1);
// Don't block on `prepare` as the socket connection takes time.
// This will be performed in the background and we'll block on
// `start` which will also ensure `prepare` completed successfully.
let fut = self
.task
.prepare(
TcpClientSrcTask::new(self.obj().clone(), saddr, buffer_pool),
TcpClientSrcTask::new(self.obj().clone(), saddr, buffer_pool, receiver),
context,
)
.check()?;
drop(fut);
let mut state = self.state.lock().unwrap();
state.event_sender = Some(sender);
drop(state);
gst::debug!(CAT, imp: self, "Preparing asynchronously");
Ok(())
@ -474,6 +553,10 @@ impl TcpClientSrc {
gst::debug!(CAT, imp: self, "Paused");
Ok(())
}
fn state(&self) -> TaskState {
self.task.state()
}
}
#[glib::object_subclass]
@ -491,6 +574,7 @@ impl ObjectSubclass for TcpClientSrc {
task: Task::default(),
configured_caps: Default::default(),
settings: Default::default(),
state: Default::default(),
}
}
}
@ -664,4 +748,31 @@ impl ElementImpl for TcpClientSrc {
Ok(success)
}
fn send_event(&self, event: gst::Event) -> bool {
use gst::EventView;
gst::debug!(CAT, imp: self, "Handling element level event {event:?}");
match event.view() {
EventView::Eos(_) => {
if self.state() != TaskState::Started {
if let Err(err) = self.start() {
gst::error!(CAT, imp: self, "Failed to start task thread {err:?}");
}
}
if self.state() == TaskState::Started {
let mut state = self.state.lock().unwrap();
if let Some(event_tx) = state.event_sender.as_mut() {
return event_tx.try_send(event.clone()).is_ok();
}
}
false
}
_ => self.parent_send_event(event),
}
}
}

View file

@ -35,9 +35,11 @@ use std::time::Duration;
use std::u16;
use crate::runtime::prelude::*;
use crate::runtime::{Async, Context, PadSrc, Task};
use crate::runtime::{task, Async, Context, PadSrc, Task, TaskState};
use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::pin_mut;
const DEFAULT_ADDRESS: Option<&str> = Some("0.0.0.0");
const DEFAULT_PORT: i32 = 5004;
@ -50,6 +52,11 @@ const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
#[derive(Debug, Default)]
struct State {
event_sender: Option<Sender<gst::Event>>,
}
#[derive(Debug, Clone)]
struct Settings {
address: Option<String>,
@ -182,16 +189,18 @@ struct UdpSrcTask {
retrieve_sender_address: bool,
need_initial_events: bool,
need_segment: bool,
event_receiver: Receiver<gst::Event>,
}
impl UdpSrcTask {
fn new(element: super::UdpSrc) -> Self {
fn new(element: super::UdpSrc, event_receiver: Receiver<gst::Event>) -> Self {
UdpSrcTask {
element,
socket: None,
retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS,
need_initial_events: true,
need_segment: true,
event_receiver,
}
}
}
@ -423,44 +432,69 @@ impl TaskImpl for UdpSrcTask {
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
self.socket
.as_mut()
.unwrap()
.try_next()
.await
.map(|(mut buffer, saddr)| {
if let Some(saddr) = saddr {
if self.retrieve_sender_address {
NetAddressMeta::add(
buffer.get_mut().unwrap(),
&gio::InetSocketAddress::from(saddr),
);
let event_fut = self.event_receiver.next().fuse();
let socket_fut = self.socket.as_mut().unwrap().try_next().fuse();
pin_mut!(event_fut);
pin_mut!(socket_fut);
futures::select! {
event_res = event_fut => match event_res {
Some(event) => {
gst::debug!(CAT, obj: self.element, "Handling element level event {event:?}");
match event.view() {
gst::EventView::Eos(_) => Err(gst::FlowError::Eos),
ev => {
gst::error!(CAT, obj: self.element, "Unexpected event {ev:?} on channel");
Err(gst::FlowError::Error)
}
}
}
buffer
})
.map_err(|err| {
gst::error!(CAT, obj: self.element, "Got error {:?}", err);
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {}", err]
);
}
None => {
gst::error!(CAT, obj: self.element, "Unexpected return on event channel");
Err(gst::FlowError::Error)
}
gst::FlowError::Error
})
},
socket_res = socket_fut => match socket_res {
Ok((mut buffer, saddr)) => {
if let Some(saddr) = saddr {
if self.retrieve_sender_address {
NetAddressMeta::add(
buffer.get_mut().unwrap(),
&gio::InetSocketAddress::from(saddr),
);
}
}
Ok(buffer)
},
Err(err) => {
gst::error!(CAT, obj: self.element, "Got error {err:#}");
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {err}"]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {err}"]
);
}
}
Err(gst::FlowError::Error)
}
},
}
}
.boxed()
}
@ -544,6 +578,40 @@ impl TaskImpl for UdpSrcTask {
}
.boxed()
}
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
match err {
gst::FlowError::Flushing => {
gst::debug!(CAT, obj: self.element, "Flushing");
task::Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(CAT, obj: self.element, "EOS");
self.element
.imp()
.src_pad
.push_event(gst::event::Eos::new())
.await;
task::Trigger::Stop
}
err => {
gst::error!(CAT, obj: self.element, "Got error {err}");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
}
}
.boxed()
}
}
pub struct UdpSrc {
@ -551,6 +619,7 @@ pub struct UdpSrc {
task: Task,
configured_caps: Mutex<Option<gst::Caps>>,
settings: Mutex<Settings>,
state: Mutex<State>,
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -575,11 +644,17 @@ impl UdpSrc {
})?;
drop(settings);
let (sender, receiver) = channel(1);
*self.configured_caps.lock().unwrap() = None;
self.task
.prepare(UdpSrcTask::new(self.obj().clone()), context)
.prepare(UdpSrcTask::new(self.obj().clone(), receiver), context)
.block_on()?;
let mut state = self.state.lock().unwrap();
state.event_sender = Some(sender);
drop(state);
gst::debug!(CAT, imp: self, "Prepared");
Ok(())
@ -611,6 +686,10 @@ impl UdpSrc {
gst::debug!(CAT, imp: self, "Paused");
Ok(())
}
fn state(&self) -> TaskState {
self.task.state()
}
}
#[glib::object_subclass]
@ -628,6 +707,7 @@ impl ObjectSubclass for UdpSrc {
task: Task::default(),
configured_caps: Default::default(),
settings: Default::default(),
state: Default::default(),
}
}
}
@ -858,4 +938,31 @@ impl ElementImpl for UdpSrc {
Ok(success)
}
fn send_event(&self, event: gst::Event) -> bool {
use gst::EventView;
gst::debug!(CAT, imp: self, "Handling element level event {event:?}");
match event.view() {
EventView::Eos(_) => {
if self.state() != TaskState::Started {
if let Err(err) = self.start() {
gst::error!(CAT, imp: self, "Failed to start task thread {err:?}");
}
}
if self.state() == TaskState::Started {
let mut state = self.state.lock().unwrap();
if let Some(event_tx) = state.event_sender.as_mut() {
return event_tx.try_send(event.clone()).is_ok();
}
}
false
}
_ => self.parent_send_event(event),
}
}
}