mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 13:31:00 +00:00
ts: update elements for new TransitionStatus
See previous commit
This commit is contained in:
parent
d4061774a4
commit
fb7929dda6
8 changed files with 81 additions and 107 deletions
|
@ -94,8 +94,8 @@ impl PadSrcHandler for AppSrcPadHandler {
|
||||||
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
||||||
|
|
||||||
let ret = match event.view() {
|
let ret = match event.view() {
|
||||||
EventView::FlushStart(..) => appsrc.task.flush_start().is_ok(),
|
EventView::FlushStart(..) => appsrc.task.flush_start().await_maybe_on_context().is_ok(),
|
||||||
EventView::FlushStop(..) => appsrc.task.flush_stop().is_ok(),
|
EventView::FlushStop(..) => appsrc.task.flush_stop().await_maybe_on_context().is_ok(),
|
||||||
EventView::Reconfigure(..) => true,
|
EventView::Reconfigure(..) => true,
|
||||||
EventView::Latency(..) => true,
|
EventView::Latency(..) => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
|
@ -396,12 +396,7 @@ impl AppSrc {
|
||||||
|
|
||||||
self.task
|
self.task
|
||||||
.prepare(AppSrcTask::new(element.clone(), receiver), context)
|
.prepare(AppSrcTask::new(element.clone(), receiver), context)
|
||||||
.map_err(|err| {
|
.block_on()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(CAT, obj: element, "Prepared");
|
gst::debug!(CAT, obj: element, "Prepared");
|
||||||
|
|
||||||
|
@ -412,28 +407,28 @@ impl AppSrc {
|
||||||
gst::debug!(CAT, obj: element, "Unpreparing");
|
gst::debug!(CAT, obj: element, "Unpreparing");
|
||||||
|
|
||||||
*self.sender.lock().unwrap() = None;
|
*self.sender.lock().unwrap() = None;
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
|
|
||||||
gst::debug!(CAT, obj: element, "Unprepared");
|
gst::debug!(CAT, obj: element, "Unprepared");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Stopping");
|
gst::debug!(CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Stopped");
|
gst::debug!(CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Starting");
|
gst::debug!(CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Started");
|
gst::debug!(CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
|
fn pause(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Pausing");
|
gst::debug!(CAT, obj: element, "Pausing");
|
||||||
self.task.pause()?;
|
let _ = self.task.pause().check()?;
|
||||||
gst::debug!(CAT, obj: element, "Paused");
|
gst::debug!(CAT, obj: element, "Paused");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -590,7 +590,7 @@ impl PadSinkHandler for SinkHandler {
|
||||||
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
||||||
|
|
||||||
if let EventView::FlushStart(..) = event.view() {
|
if let EventView::FlushStart(..) = event.view() {
|
||||||
if let Err(err) = jb.task.flush_start() {
|
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -632,7 +632,7 @@ impl PadSinkHandler for SinkHandler {
|
||||||
state.segment = e.segment().clone().downcast::<gst::format::Time>().unwrap();
|
state.segment = e.segment().clone().downcast::<gst::format::Time>().unwrap();
|
||||||
}
|
}
|
||||||
EventView::FlushStop(..) => {
|
EventView::FlushStop(..) => {
|
||||||
if let Err(err) = jb.task.flush_stop() {
|
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -908,7 +908,7 @@ impl PadSrcHandler for SrcHandler {
|
||||||
|
|
||||||
match event.view() {
|
match event.view() {
|
||||||
EventView::FlushStart(..) => {
|
EventView::FlushStart(..) => {
|
||||||
if let Err(err) = jb.task.flush_start() {
|
if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -920,7 +920,7 @@ impl PadSrcHandler for SrcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EventView::FlushStop(..) => {
|
EventView::FlushStop(..) => {
|
||||||
if let Err(err) = jb.task.flush_stop() {
|
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -1286,12 +1286,7 @@ impl JitterBuffer {
|
||||||
JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
|
JitterBufferTask::new(element, &self.src_pad_handler, &self.sink_pad_handler),
|
||||||
context,
|
context,
|
||||||
)
|
)
|
||||||
.map_err(|err| {
|
.block_on()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(CAT, obj: element, "Prepared");
|
gst::debug!(CAT, obj: element, "Prepared");
|
||||||
|
|
||||||
|
@ -1300,20 +1295,20 @@ impl JitterBuffer {
|
||||||
|
|
||||||
fn unprepare(&self, element: &super::JitterBuffer) {
|
fn unprepare(&self, element: &super::JitterBuffer) {
|
||||||
gst::debug!(CAT, obj: element, "Unpreparing");
|
gst::debug!(CAT, obj: element, "Unpreparing");
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
gst::debug!(CAT, obj: element, "Unprepared");
|
gst::debug!(CAT, obj: element, "Unprepared");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Starting");
|
gst::debug!(CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Started");
|
gst::debug!(CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Stopping");
|
gst::debug!(CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Stopped");
|
gst::debug!(CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -734,7 +734,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
|
||||||
|
|
||||||
match event.view() {
|
match event.view() {
|
||||||
EventView::FlushStart(..) => {
|
EventView::FlushStart(..) => {
|
||||||
if let Err(err) = proxysrc.task.flush_start() {
|
if let Err(err) = proxysrc.task.flush_start().await_maybe_on_context() {
|
||||||
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -746,7 +746,7 @@ impl PadSrcHandler for ProxySrcPadHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EventView::FlushStop(..) => {
|
EventView::FlushStop(..) => {
|
||||||
if let Err(err) = proxysrc.task.flush_stop() {
|
if let Err(err) = proxysrc.task.flush_stop().await_maybe_on_context() {
|
||||||
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -1045,12 +1045,7 @@ impl ProxySrc {
|
||||||
|
|
||||||
self.task
|
self.task
|
||||||
.prepare(ProxySrcTask::new(element.clone(), dataqueue), ts_ctx)
|
.prepare(ProxySrcTask::new(element.clone(), dataqueue), ts_ctx)
|
||||||
.map_err(|err| {
|
.block_on()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(SRC_CAT, obj: element, "Prepared");
|
gst::debug!(SRC_CAT, obj: element, "Prepared");
|
||||||
|
|
||||||
|
@ -1066,7 +1061,7 @@ impl ProxySrc {
|
||||||
proxy_src_pads.remove(&settings.proxy_context);
|
proxy_src_pads.remove(&settings.proxy_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
|
|
||||||
*self.dataqueue.lock().unwrap() = None;
|
*self.dataqueue.lock().unwrap() = None;
|
||||||
*self.proxy_ctx.lock().unwrap() = None;
|
*self.proxy_ctx.lock().unwrap() = None;
|
||||||
|
@ -1076,21 +1071,21 @@ impl ProxySrc {
|
||||||
|
|
||||||
fn stop(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Stopping");
|
gst::debug!(SRC_CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().await_maybe_on_context()?;
|
||||||
gst::debug!(SRC_CAT, obj: element, "Stopped");
|
gst::debug!(SRC_CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Starting");
|
gst::debug!(SRC_CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().await_maybe_on_context()?;
|
||||||
gst::debug!(SRC_CAT, obj: element, "Started");
|
gst::debug!(SRC_CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
|
fn pause(&self, element: &super::ProxySrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Pausing");
|
gst::debug!(SRC_CAT, obj: element, "Pausing");
|
||||||
self.task.pause()?;
|
let _ = self.task.pause().check()?;
|
||||||
gst::debug!(SRC_CAT, obj: element, "Paused");
|
gst::debug!(SRC_CAT, obj: element, "Paused");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,7 +135,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
|
||||||
gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
|
gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
|
||||||
|
|
||||||
if let EventView::FlushStart(..) = event.view() {
|
if let EventView::FlushStart(..) = event.view() {
|
||||||
if let Err(err) = queue.task.flush_start() {
|
if let Err(err) = queue.task.flush_start().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -169,7 +169,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
|
||||||
let queue = element.imp();
|
let queue = element.imp();
|
||||||
|
|
||||||
if let EventView::FlushStop(..) = event.view() {
|
if let EventView::FlushStop(..) = event.view() {
|
||||||
if let Err(err) = queue.task.flush_stop() {
|
if let Err(err) = queue.task.flush_stop().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -229,12 +229,12 @@ impl PadSrcHandler for QueuePadSrcHandler {
|
||||||
|
|
||||||
match event.view() {
|
match event.view() {
|
||||||
EventView::FlushStart(..) => {
|
EventView::FlushStart(..) => {
|
||||||
if let Err(err) = queue.task.flush_start() {
|
if let Err(err) = queue.task.flush_start().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EventView::FlushStop(..) => {
|
EventView::FlushStop(..) => {
|
||||||
if let Err(err) = queue.task.flush_stop() {
|
if let Err(err) = queue.task.flush_stop().await_maybe_on_context() {
|
||||||
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
|
||||||
gst::element_error!(
|
gst::element_error!(
|
||||||
element,
|
element,
|
||||||
|
@ -639,12 +639,7 @@ impl Queue {
|
||||||
|
|
||||||
self.task
|
self.task
|
||||||
.prepare(QueueTask::new(element.clone(), dataqueue), context)
|
.prepare(QueueTask::new(element.clone(), dataqueue), context)
|
||||||
.map_err(|err| {
|
.block_on()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(CAT, obj: element, "Prepared");
|
gst::debug!(CAT, obj: element, "Prepared");
|
||||||
|
|
||||||
|
@ -654,7 +649,7 @@ impl Queue {
|
||||||
fn unprepare(&self, element: &super::Queue) {
|
fn unprepare(&self, element: &super::Queue) {
|
||||||
gst::debug!(CAT, obj: element, "Unpreparing");
|
gst::debug!(CAT, obj: element, "Unpreparing");
|
||||||
|
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
|
|
||||||
*self.dataqueue.lock().unwrap() = None;
|
*self.dataqueue.lock().unwrap() = None;
|
||||||
*self.pending_queue.lock().unwrap() = None;
|
*self.pending_queue.lock().unwrap() = None;
|
||||||
|
@ -666,14 +661,14 @@ impl Queue {
|
||||||
|
|
||||||
fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Stopping");
|
gst::debug!(CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().await_maybe_on_context()?;
|
||||||
gst::debug!(CAT, obj: element, "Stopped");
|
gst::debug!(CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Starting");
|
gst::debug!(CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().await_maybe_on_context()?;
|
||||||
gst::debug!(CAT, obj: element, "Started");
|
gst::debug!(CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,8 +108,16 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
|
||||||
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
||||||
|
|
||||||
let ret = match event.view() {
|
let ret = match event.view() {
|
||||||
EventView::FlushStart(..) => tcpclientsrc.task.flush_start().is_ok(),
|
EventView::FlushStart(..) => tcpclientsrc
|
||||||
EventView::FlushStop(..) => tcpclientsrc.task.flush_stop().is_ok(),
|
.task
|
||||||
|
.flush_start()
|
||||||
|
.await_maybe_on_context()
|
||||||
|
.is_ok(),
|
||||||
|
EventView::FlushStop(..) => tcpclientsrc
|
||||||
|
.task
|
||||||
|
.flush_stop()
|
||||||
|
.await_maybe_on_context()
|
||||||
|
.is_ok(),
|
||||||
EventView::Reconfigure(..) => true,
|
EventView::Reconfigure(..) => true,
|
||||||
EventView::Latency(..) => true,
|
EventView::Latency(..) => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
|
@ -440,46 +448,45 @@ impl TcpClientSrc {
|
||||||
|
|
||||||
let saddr = SocketAddr::new(host, port as u16);
|
let saddr = SocketAddr::new(host, port as u16);
|
||||||
|
|
||||||
self.task
|
// Don't block on `prepare` as the socket connection takes time.
|
||||||
|
// This will be performed in the background and we'll block on
|
||||||
|
// `start` which will also ensure `prepare` completed successfully.
|
||||||
|
let _ = self
|
||||||
|
.task
|
||||||
.prepare(
|
.prepare(
|
||||||
TcpClientSrcTask::new(element.clone(), saddr, buffer_pool),
|
TcpClientSrcTask::new(element.clone(), saddr, buffer_pool),
|
||||||
context,
|
context,
|
||||||
)
|
)
|
||||||
.map_err(|err| {
|
.check()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(CAT, obj: element, "Prepared");
|
gst::debug!(CAT, obj: element, "Preparing asynchronously");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn unprepare(&self, element: &super::TcpClientSrc) {
|
fn unprepare(&self, element: &super::TcpClientSrc) {
|
||||||
gst::debug!(CAT, obj: element, "Unpreparing");
|
gst::debug!(CAT, obj: element, "Unpreparing");
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
gst::debug!(CAT, obj: element, "Unprepared");
|
gst::debug!(CAT, obj: element, "Unprepared");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Stopping");
|
gst::debug!(CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Stopped");
|
gst::debug!(CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Starting");
|
gst::debug!(CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Started");
|
gst::debug!(CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
|
fn pause(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Pausing");
|
gst::debug!(CAT, obj: element, "Pausing");
|
||||||
self.task.pause()?;
|
let _ = self.task.pause().check()?;
|
||||||
gst::debug!(CAT, obj: element, "Paused");
|
gst::debug!(CAT, obj: element, "Paused");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
|
||||||
async move {
|
async move {
|
||||||
if let EventView::FlushStop(_) = event.view() {
|
if let EventView::FlushStop(_) = event.view() {
|
||||||
let udpsink = element.imp();
|
let udpsink = element.imp();
|
||||||
return udpsink.task.flush_stop().is_ok();
|
return udpsink.task.flush_stop().await_maybe_on_context().is_ok();
|
||||||
} else if sender.send_async(TaskItem::Event(event)).await.is_err() {
|
} else if sender.send_async(TaskItem::Event(event)).await.is_err() {
|
||||||
gst::debug!(CAT, obj: &element, "Flushing");
|
gst::debug!(CAT, obj: &element, "Flushing");
|
||||||
}
|
}
|
||||||
|
@ -204,7 +204,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
|
||||||
event: gst::Event,
|
event: gst::Event,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
if let EventView::FlushStart(..) = event.view() {
|
if let EventView::FlushStart(..) = event.view() {
|
||||||
return udpsink.task.flush_start().is_ok();
|
return udpsink.task.flush_start().await_maybe_on_context().is_ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
true
|
true
|
||||||
|
@ -805,12 +805,7 @@ impl UdpSink {
|
||||||
let (item_sender, item_receiver) = flume::bounded(0);
|
let (item_sender, item_receiver) = flume::bounded(0);
|
||||||
let (cmd_sender, cmd_receiver) = flume::unbounded();
|
let (cmd_sender, cmd_receiver) = flume::unbounded();
|
||||||
let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver);
|
let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver);
|
||||||
self.task.prepare(task_impl, context).map_err(|err| {
|
self.task.prepare(task_impl, context).block_on()?;
|
||||||
error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
*self.item_sender.lock().unwrap() = Some(item_sender);
|
*self.item_sender.lock().unwrap() = Some(item_sender);
|
||||||
*self.cmd_sender.lock().unwrap() = Some(cmd_sender);
|
*self.cmd_sender.lock().unwrap() = Some(cmd_sender);
|
||||||
|
@ -822,20 +817,20 @@ impl UdpSink {
|
||||||
|
|
||||||
fn unprepare(&self, element: &super::UdpSink) {
|
fn unprepare(&self, element: &super::UdpSink) {
|
||||||
gst::debug!(CAT, obj: element, "Unpreparing");
|
gst::debug!(CAT, obj: element, "Unpreparing");
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
gst::debug!(CAT, obj: element, "Unprepared");
|
gst::debug!(CAT, obj: element, "Unprepared");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Stopping");
|
gst::debug!(CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Stopped");
|
gst::debug!(CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Starting");
|
gst::debug!(CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Started");
|
gst::debug!(CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,8 +125,8 @@ impl PadSrcHandler for UdpSrcPadHandler {
|
||||||
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
||||||
|
|
||||||
let ret = match event.view() {
|
let ret = match event.view() {
|
||||||
EventView::FlushStart(..) => udpsrc.task.flush_start().is_ok(),
|
EventView::FlushStart(..) => udpsrc.task.flush_start().await_maybe_on_context().is_ok(),
|
||||||
EventView::FlushStop(..) => udpsrc.task.flush_stop().is_ok(),
|
EventView::FlushStop(..) => udpsrc.task.flush_stop().await_maybe_on_context().is_ok(),
|
||||||
EventView::Reconfigure(..) => true,
|
EventView::Reconfigure(..) => true,
|
||||||
EventView::Latency(..) => true,
|
EventView::Latency(..) => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
|
@ -587,12 +587,7 @@ impl UdpSrc {
|
||||||
*self.configured_caps.lock().unwrap() = None;
|
*self.configured_caps.lock().unwrap() = None;
|
||||||
self.task
|
self.task
|
||||||
.prepare(UdpSrcTask::new(element.clone()), context)
|
.prepare(UdpSrcTask::new(element.clone()), context)
|
||||||
.map_err(|err| {
|
.block_on()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::OpenRead,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(CAT, obj: element, "Prepared");
|
gst::debug!(CAT, obj: element, "Prepared");
|
||||||
|
|
||||||
|
@ -601,27 +596,27 @@ impl UdpSrc {
|
||||||
|
|
||||||
fn unprepare(&self, element: &super::UdpSrc) {
|
fn unprepare(&self, element: &super::UdpSrc) {
|
||||||
gst::debug!(CAT, obj: element, "Unpreparing");
|
gst::debug!(CAT, obj: element, "Unpreparing");
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
gst::debug!(CAT, obj: element, "Unprepared");
|
gst::debug!(CAT, obj: element, "Unprepared");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
|
fn stop(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Stopping");
|
gst::debug!(CAT, obj: element, "Stopping");
|
||||||
self.task.stop()?;
|
self.task.stop().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Stopped");
|
gst::debug!(CAT, obj: element, "Stopped");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
|
fn start(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Starting");
|
gst::debug!(CAT, obj: element, "Starting");
|
||||||
self.task.start()?;
|
self.task.start().block_on()?;
|
||||||
gst::debug!(CAT, obj: element, "Started");
|
gst::debug!(CAT, obj: element, "Started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
|
fn pause(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
gst::debug!(CAT, obj: element, "Pausing");
|
gst::debug!(CAT, obj: element, "Pausing");
|
||||||
self.task.pause()?;
|
let _ = self.task.pause().check()?;
|
||||||
gst::debug!(CAT, obj: element, "Paused");
|
gst::debug!(CAT, obj: element, "Paused");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,15 +97,17 @@ mod imp_src {
|
||||||
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
|
||||||
|
|
||||||
let ret = match event.view() {
|
let ret = match event.view() {
|
||||||
EventView::FlushStart(..) => {
|
EventView::FlushStart(..) => elem_src_test
|
||||||
elem_src_test.task.flush_start().unwrap();
|
.task
|
||||||
true
|
.flush_start()
|
||||||
}
|
.await_maybe_on_context()
|
||||||
|
.is_ok(),
|
||||||
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
|
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
|
||||||
EventView::FlushStop(..) => {
|
EventView::FlushStop(..) => elem_src_test
|
||||||
elem_src_test.task.flush_stop().unwrap();
|
.task
|
||||||
true
|
.flush_stop()
|
||||||
}
|
.await_maybe_on_context()
|
||||||
|
.is_ok(),
|
||||||
_ => false,
|
_ => false,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -243,12 +245,7 @@ mod imp_src {
|
||||||
|
|
||||||
self.task
|
self.task
|
||||||
.prepare(ElementSrcTestTask::new(element.clone(), receiver), context)
|
.prepare(ElementSrcTestTask::new(element.clone(), receiver), context)
|
||||||
.map_err(|err| {
|
.block_on()?;
|
||||||
gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Error preparing Task: {:?}", err]
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
gst::debug!(SRC_CAT, obj: element, "Prepared");
|
gst::debug!(SRC_CAT, obj: element, "Prepared");
|
||||||
|
|
||||||
|
@ -259,26 +256,26 @@ mod imp_src {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Unpreparing");
|
gst::debug!(SRC_CAT, obj: element, "Unpreparing");
|
||||||
|
|
||||||
*self.sender.lock().unwrap() = None;
|
*self.sender.lock().unwrap() = None;
|
||||||
self.task.unprepare().unwrap();
|
self.task.unprepare().block_on().unwrap();
|
||||||
|
|
||||||
gst::debug!(SRC_CAT, obj: element, "Unprepared");
|
gst::debug!(SRC_CAT, obj: element, "Unprepared");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self, element: &super::ElementSrcTest) {
|
fn stop(&self, element: &super::ElementSrcTest) {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Stopping");
|
gst::debug!(SRC_CAT, obj: element, "Stopping");
|
||||||
self.task.stop().unwrap();
|
self.task.stop().await_maybe_on_context().unwrap();
|
||||||
gst::debug!(SRC_CAT, obj: element, "Stopped");
|
gst::debug!(SRC_CAT, obj: element, "Stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&self, element: &super::ElementSrcTest) {
|
fn start(&self, element: &super::ElementSrcTest) {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Starting");
|
gst::debug!(SRC_CAT, obj: element, "Starting");
|
||||||
self.task.start().unwrap();
|
self.task.start().await_maybe_on_context().unwrap();
|
||||||
gst::debug!(SRC_CAT, obj: element, "Started");
|
gst::debug!(SRC_CAT, obj: element, "Started");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pause(&self, element: &super::ElementSrcTest) {
|
fn pause(&self, element: &super::ElementSrcTest) {
|
||||||
gst::debug!(SRC_CAT, obj: element, "Pausing");
|
gst::debug!(SRC_CAT, obj: element, "Pausing");
|
||||||
self.task.pause().unwrap();
|
let _ = self.task.pause().check().unwrap();
|
||||||
gst::debug!(SRC_CAT, obj: element, "Paused");
|
gst::debug!(SRC_CAT, obj: element, "Paused");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -421,10 +418,10 @@ mod imp_src {
|
||||||
fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
|
fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool {
|
||||||
match event.view() {
|
match event.view() {
|
||||||
EventView::FlushStart(..) => {
|
EventView::FlushStart(..) => {
|
||||||
self.task.flush_start().unwrap();
|
self.task.flush_start().await_maybe_on_context().unwrap();
|
||||||
}
|
}
|
||||||
EventView::FlushStop(..) => {
|
EventView::FlushStop(..) => {
|
||||||
self.task.flush_stop().unwrap();
|
self.task.flush_stop().await_maybe_on_context().unwrap();
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue