diff --git a/gst-plugin-audiofx/src/audioecho.rs b/gst-plugin-audiofx/src/audioecho.rs index 2330762b..21162213 100644 --- a/gst-plugin-audiofx/src/audioecho.rs +++ b/gst-plugin-audiofx/src/audioecho.rs @@ -254,20 +254,14 @@ impl BaseTransformImpl for AudioEcho { &self, _element: &gst_base::BaseTransform, buf: &mut gst::BufferRef, - ) -> gst::FlowReturn { + ) -> Result { let mut settings = *self.settings.lock().unwrap(); settings.delay = cmp::min(settings.max_delay, settings.delay); let mut state_guard = self.state.lock().unwrap(); - let state = match *state_guard { - None => return gst::FlowReturn::NotNegotiated, - Some(ref mut state) => state, - }; + let state = state_guard.as_mut().ok_or(gst::FlowError::NotNegotiated)?; - let mut map = match buf.map_writable() { - None => return gst::FlowReturn::Error, - Some(map) => map, - }; + let mut map = buf.map_writable().ok_or(gst::FlowError::Error)?; match state.info.format() { gst_audio::AUDIO_FORMAT_F64 => { @@ -278,10 +272,10 @@ impl BaseTransformImpl for AudioEcho { let data = map.as_mut_slice_of::().unwrap(); Self::process(data, state, &settings); } - _ => return gst::FlowReturn::NotNegotiated, + _ => return Err(gst::FlowError::NotNegotiated), } - gst::FlowReturn::Ok + Ok(gst::FlowSuccess::Ok) } fn set_caps( diff --git a/gst-plugin-file/src/filesink.rs b/gst-plugin-file/src/filesink.rs index 7bed04c9..394d61f9 100644 --- a/gst-plugin-file/src/filesink.rs +++ b/gst-plugin-file/src/filesink.rs @@ -265,7 +265,11 @@ impl BaseSinkImpl for FileSink { // TODO: implement seek in BYTES format - fn render(&self, element: &gst_base::BaseSink, buffer: &gst::BufferRef) -> gst::FlowReturn { + fn render( + &self, + element: &gst_base::BaseSink, + buffer: &gst::BufferRef, + ) -> Result { let mut state = self.state.lock().unwrap(); let (file, position) = match *state { State::Started { @@ -274,35 +278,28 @@ impl BaseSinkImpl for FileSink { } => (file, position), State::Stopped => { gst_element_error!(element, gst::CoreError::Failed, ["Not started yet"]); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } }; gst_trace!(self.cat, obj: element, "Rendering {:?}", buffer); - let map = match buffer.map_readable() { - None => { - gst_element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); - return gst::FlowReturn::Error; - } - Some(map) => map, - }; + let map = buffer.map_readable().ok_or_else(|| { + gst_element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; - match file.write_all(map.as_ref()) { - Ok(()) => { - *position += map.len() as u64; + file.write_all(map.as_ref()).map_err(|err| { + gst_element_error!( + element, + gst::ResourceError::Write, + ["Failed to write buffer: {}", err] + ); + gst::FlowError::Error + })?; - gst::FlowReturn::Ok - } - Err(err) => { - gst_element_error!( - element, - gst::ResourceError::Write, - ["Failed to write buffer: {}", err] - ); + *position += map.len() as u64; - gst::FlowReturn::Error - } - } + Ok(gst::FlowSuccess::Ok) } } diff --git a/gst-plugin-file/src/filesrc.rs b/gst-plugin-file/src/filesrc.rs index bbef102a..d95f9dc7 100644 --- a/gst-plugin-file/src/filesrc.rs +++ b/gst-plugin-file/src/filesrc.rs @@ -306,7 +306,7 @@ impl BaseSrcImpl for FileSrc { offset: u64, _length: u32, buffer: &mut gst::BufferRef, - ) -> gst::FlowReturn { + ) -> Result { let mut state = self.state.lock().unwrap(); let (file, position) = match *state { @@ -316,54 +316,44 @@ impl BaseSrcImpl for FileSrc { } => (file, position), State::Stopped => { gst_element_error!(element, gst::CoreError::Failed, ["Not started yet"]); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } }; if *position != offset { - if let Err(err) = file.seek(SeekFrom::Start(offset)) { + file.seek(SeekFrom::Start(offset)).map_err(|err| { gst_element_error!( element, gst::LibraryError::Failed, ["Failed to seek to {}: {}", offset, err.to_string()] ); - return gst::FlowReturn::Error; - } + gst::FlowError::Error + })?; *position = offset; } let size = { - let mut map = match buffer.map_writable() { - Some(map) => map, - None => { - gst_element_error!( - element, - gst::LibraryError::Failed, - ["Failed to map buffer"] - ); - return gst::FlowReturn::Error; - } - }; + let mut map = buffer.map_writable().ok_or_else(|| { + gst_element_error!(element, gst::LibraryError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; - match file.read(map.as_mut()) { - Ok(size) => size, - Err(err) => { - gst_element_error!( - element, - gst::LibraryError::Failed, - ["Failed to read at {}: {}", offset, err.to_string()] - ); - return gst::FlowReturn::Error; - } - } + file.read(map.as_mut()).map_err(|err| { + gst_element_error!( + element, + gst::LibraryError::Failed, + ["Failed to read at {}: {}", offset, err.to_string()] + ); + gst::FlowError::Error + })? }; *position += size as u64; buffer.set_size(size); - gst::FlowReturn::Ok + Ok(gst::FlowSuccess::Ok) } } diff --git a/gst-plugin-flv/src/flvdemux.rs b/gst-plugin-flv/src/flvdemux.rs index 4c43f345..94adcc12 100644 --- a/gst-plugin-flv/src/flvdemux.rs +++ b/gst-plugin-flv/src/flvdemux.rs @@ -158,7 +158,7 @@ impl ObjectSubclass for FlvDemux { sinkpad.set_chain_function(|pad, parent, buffer| { FlvDemux::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |demux, element| demux.sink_chain(pad, element, buffer), ) }); @@ -461,7 +461,7 @@ impl FlvDemux { pad: &gst::Pad, element: &gst::Element, buffer: gst::Buffer, - ) -> gst::FlowReturn { + ) -> Result { gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); let mut adapter = self.adapter.lock().unwrap(); @@ -476,7 +476,7 @@ impl FlvDemux { Ok(header) => header, Err(_) => { gst_trace!(CAT, obj: element, "Need more data"); - return gst::FlowReturn::Ok; + return Ok(gst::FlowSuccess::Ok); } }; @@ -505,7 +505,7 @@ impl FlvDemux { let avail = adapter.available(); if avail == 0 { gst_trace!(CAT, obj: element, "Need more data"); - return gst::FlowReturn::Ok; + return Ok(gst::FlowSuccess::Ok); } let skip = cmp::min(avail, *skip_left as usize); adapter.flush(skip); @@ -517,22 +517,20 @@ impl FlvDemux { match res { Ok(None) => { gst_trace!(CAT, obj: element, "Need more data"); - return gst::FlowReturn::Ok; + return Ok(gst::FlowSuccess::Ok); } Ok(Some(events)) => { drop(state); drop(adapter); - if let Err(err) = self.handle_events(element, events) { - return err.into(); - } + self.handle_events(element, events)?; adapter = self.adapter.lock().unwrap(); state = self.state.lock().unwrap(); } Err(err) => { element.post_error_message(&err); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } } } @@ -622,8 +620,7 @@ impl FlvDemux { self.flow_combiner .lock() .unwrap() - .update_pad_flow(&pad, res) - .into_result()?; + .update_pad_flow(&pad, res)?; } } Event::HaveAllStreams => { diff --git a/gst-plugin-http/src/httpsrc.rs b/gst-plugin-http/src/httpsrc.rs index a238d88c..23f21d2d 100644 --- a/gst-plugin-http/src/httpsrc.rs +++ b/gst-plugin-http/src/httpsrc.rs @@ -350,7 +350,7 @@ impl BaseSrcImpl for HttpSrc { offset: u64, _: u32, buffer: &mut gst::BufferRef, - ) -> gst::FlowReturn { + ) -> Result { let mut state = self.state.lock().unwrap(); let (response, position) = match *state { @@ -362,7 +362,7 @@ impl BaseSrcImpl for HttpSrc { State::Stopped => { gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } }; @@ -373,45 +373,39 @@ impl BaseSrcImpl for HttpSrc { ["Got unexpected offset {}, expected {}", offset, position] ); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } let size = { - let mut map = match buffer.map_writable() { - None => { - gst_element_error!(src, gst::LibraryError::Failed, ["Failed to map buffer"]); + let mut map = buffer.map_writable().ok_or_else(|| { + gst_element_error!(src, gst::LibraryError::Failed, ["Failed to map buffer"]); - return gst::FlowReturn::Error; - } - Some(map) => map, - }; + gst::FlowError::Error + })?; let data = map.as_mut_slice(); - match response.read(data) { - Ok(size) => size, - Err(err) => { - gst_error!(self.cat, obj: src, "Failed to read: {:?}", err); - gst_element_error!( - src, - gst::ResourceError::Read, - ["Failed to read at {}: {}", offset, err.to_string()] - ); + response.read(data).map_err(|err| { + gst_error!(self.cat, obj: src, "Failed to read: {:?}", err); + gst_element_error!( + src, + gst::ResourceError::Read, + ["Failed to read at {}: {}", offset, err.to_string()] + ); - return gst::FlowReturn::Error; - } - } + gst::FlowError::Error + })? }; if size == 0 { - return gst::FlowReturn::Eos; + return Err(gst::FlowError::Eos); } *position += size as u64; buffer.set_size(size); - gst::FlowReturn::Ok + Ok(gst::FlowSuccess::Ok) } } diff --git a/gst-plugin-threadshare/Cargo.toml b/gst-plugin-threadshare/Cargo.toml index 211adeaf..68da879d 100644 --- a/gst-plugin-threadshare/Cargo.toml +++ b/gst-plugin-threadshare/Cargo.toml @@ -13,6 +13,7 @@ gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-s glib = { git = "https://github.com/gtk-rs/glib", features = ["subclassing"] } gio = { git = "https://github.com/gtk-rs/gio" } gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } +gstreamer-app = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } tokio = "0.1" tokio-reactor = "0.1" diff --git a/gst-plugin-threadshare/examples/benchmark.rs b/gst-plugin-threadshare/examples/benchmark.rs index 98d941bc..fd677a47 100644 --- a/gst-plugin-threadshare/examples/benchmark.rs +++ b/gst-plugin-threadshare/examples/benchmark.rs @@ -164,10 +164,7 @@ fn main() { glib::Continue(true) }); - assert_ne!( - pipeline.set_state(gst::State::Playing), - gst::StateChangeReturn::Failure - ); + pipeline.set_state(gst::State::Playing).unwrap(); println!("started"); diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index 96a327a9..70788810 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -173,9 +173,9 @@ impl AppSrc { true } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Playing - || ret == gst::StateChangeReturn::Async && pending == gst::State::Playing + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { let _ = self.start(element); } @@ -331,7 +331,7 @@ impl AppSrc { let res = match item { Either::Left(buffer) => { gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer); - self.src_pad.push(buffer).into_result().map(|_| ()) + self.src_pad.push(buffer).map(|_| ()) } Either::Right(event) => { gst_log!(self.cat, obj: element, "Forwarding event {:?}", event); @@ -638,41 +638,34 @@ impl ElementImpl for AppSrc { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => match self.prepare(element) { - Err(err) => { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { element.post_error_message(&err); - return gst::StateChangeReturn::Failure; - } - Ok(_) => (), - }, - gst::StateChange::PlayingToPaused => match self.stop(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - gst::StateChange::ReadyToNull => match self.unprepare(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, + gst::StateChangeError + })?; + } + gst::StateChange::PlayingToPaused => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element).map_err(|_| gst::StateChangeError)?; + } _ => (), } - let mut ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } + let mut success = self.parent_change_state(element, transition)?; match transition { gst::StateChange::ReadyToPaused => { - ret = gst::StateChangeReturn::NoPreroll; + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToPlaying => { + self.start(element).map_err(|_| gst::StateChangeError)?; } - gst::StateChange::PausedToPlaying => match self.start(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, gst::StateChange::PausedToReady => { let mut state = self.state.lock().unwrap(); state.need_initial_events = true; @@ -680,7 +673,7 @@ impl ElementImpl for AppSrc { _ => (), } - ret + Ok(success) } } diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index fbdb8c5f..8c7be572 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -198,7 +198,7 @@ impl SharedQueue { let inner = Arc::new(Mutex::new(SharedQueueInner { name: name.into(), queue: None, - last_ret: gst::FlowReturn::Flushing, + last_res: Err(gst::FlowError::Flushing), pending_queue: None, pending_future_cancel: None, have_sink: as_sink, @@ -236,7 +236,7 @@ impl Drop for SharedQueue { struct SharedQueueInner { name: String, queue: Option, - last_ret: gst::FlowReturn, + last_res: Result, pending_queue: Option<(Option, bool, VecDeque)>, pending_future_cancel: Option>, have_sink: bool, @@ -296,7 +296,7 @@ impl ProxySink { _pad: &gst::Pad, element: &gst::Element, item: DataQueueItem, - ) -> gst::FlowReturn { + ) -> Result { let wait_future = { let state = self.state.lock().unwrap(); let StateSink { @@ -305,10 +305,7 @@ impl ProxySink { pending_future_id, .. } = *state; - let queue = match *queue { - None => return gst::FlowReturn::Error, - Some(ref queue) => queue, - }; + let queue = queue.as_ref().ok_or(gst::FlowError::Error)?; let mut queue = queue.0.lock().unwrap(); @@ -472,23 +469,20 @@ impl ProxySink { if let Some(wait_future) = wait_future { gst_log!(self.cat, obj: element, "Blocking until queue becomes empty"); - match executor::current_thread::block_on_all(wait_future) { - Err(_) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ["failed to wait for queue to become empty again"] - ); - return gst::FlowReturn::Error; - } - Ok(_) => (), - } + executor::current_thread::block_on_all(wait_future).map_err(|_| { + gst_element_error!( + element, + gst::StreamError::Failed, + ["failed to wait for queue to become empty again"] + ); + gst::FlowError::Error + })?; } let state = self.state.lock().unwrap(); let queue = state.queue.as_ref().unwrap(); - let ret = queue.0.lock().unwrap().last_ret; - ret + let res = queue.0.lock().unwrap().last_res; + res } fn sink_chain( @@ -496,7 +490,7 @@ impl ProxySink { pad: &gst::Pad, element: &gst::Element, buffer: gst::Buffer, - ) -> gst::FlowReturn { + ) -> Result { gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); self.enqueue_item(pad, element, DataQueueItem::Buffer(buffer)) } @@ -506,7 +500,7 @@ impl ProxySink { pad: &gst::Pad, element: &gst::Element, list: gst::BufferList, - ) -> gst::FlowReturn { + ) -> Result { gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list); self.enqueue_item(pad, element, DataQueueItem::BufferList(list)) } @@ -524,9 +518,9 @@ impl ProxySink { let _ = self.stop(element); } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Paused - || ret == gst::StateChangeReturn::Async && pending == gst::State::Paused + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Paused + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Paused { let _ = self.start(element); } @@ -605,7 +599,7 @@ impl ProxySink { let state = self.state.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); - queue.last_ret = gst::FlowReturn::Ok; + queue.last_res = Ok(gst::FlowSuccess::Ok); gst_debug!(self.cat, obj: element, "Started"); @@ -624,7 +618,7 @@ impl ProxySink { if let Some((Some(task), _, _)) = queue.pending_queue.take() { task.notify(); } - queue.last_ret = gst::FlowReturn::Flushing; + queue.last_res = Err(gst::FlowError::Flushing); gst_debug!(self.cat, obj: element, "Stopped"); @@ -668,14 +662,14 @@ impl ObjectSubclass for ProxySink { sink_pad.set_chain_function(|pad, parent, buffer| { ProxySink::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |queue, element| queue.sink_chain(pad, element, buffer), ) }); sink_pad.set_chain_list_function(|pad, parent, list| { ProxySink::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |queue, element| queue.sink_chain_list(pad, element, list), ) }); @@ -749,42 +743,32 @@ impl ElementImpl for ProxySink { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => match self.prepare(element) { - Err(err) => { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { element.post_error_message(&err); - return gst::StateChangeReturn::Failure; - } - Ok(_) => (), - }, - gst::StateChange::PausedToReady => match self.stop(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - gst::StateChange::ReadyToNull => match self.unprepare(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element).map_err(|_| gst::StateChangeError)?; + } _ => (), } - let ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; + let success = self.parent_change_state(element, transition)?; + + if transition == gst::StateChange::ReadyToPaused { + self.start(element).map_err(|_| gst::StateChangeError)?; } - match transition { - gst::StateChange::ReadyToPaused => match self.start(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - _ => (), - } - - ret + Ok(success) } } @@ -824,9 +808,9 @@ impl ProxySrc { true } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Playing - || ret == gst::StateChangeReturn::Async && pending == gst::State::Playing + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { let _ = self.start(element); } @@ -920,11 +904,11 @@ impl ProxySrc { let res = match item { DataQueueItem::Buffer(buffer) => { gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer); - self.src_pad.push(buffer).into_result().map(|_| ()) + self.src_pad.push(buffer).map(|_| ()) } DataQueueItem::BufferList(list) => { gst_log!(self.cat, obj: element, "Forwarding buffer list {:?}", list); - self.src_pad.push_list(list).into_result().map(|_| ()) + self.src_pad.push_list(list).map(|_| ()) } DataQueueItem::Event(event) => { use gst::EventView; @@ -960,7 +944,7 @@ impl ProxySrc { gst_log!(self.cat, obj: element, "Successfully pushed item"); let state = self.state.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); - queue.last_ret = gst::FlowReturn::Ok; + queue.last_res = Ok(gst::FlowSuccess::Ok); Ok(()) } Err(gst::FlowError::Flushing) => { @@ -970,7 +954,7 @@ impl ProxySrc { if let Some(ref queue) = queue.queue { queue.pause(); } - queue.last_ret = gst::FlowReturn::Flushing; + queue.last_res = Err(gst::FlowError::Flushing); Ok(()) } Err(gst::FlowError::Eos) => { @@ -980,7 +964,7 @@ impl ProxySrc { if let Some(ref queue) = queue.queue { queue.pause(); } - queue.last_ret = gst::FlowReturn::Eos; + queue.last_res = Err(gst::FlowError::Eos); Ok(()) } Err(err) => { @@ -993,7 +977,7 @@ impl ProxySrc { ); let state = self.state.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); - queue.last_ret = gst::FlowReturn::from_error(err); + queue.last_res = Err(err); Err(gst::FlowError::CustomError) } }; @@ -1036,15 +1020,9 @@ impl ProxySrc { ) })?; - let queue = match SharedQueue::get(&settings.proxy_context, false) { - Some(queue) => queue, - None => { - return Err(gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to create get queue"] - )); - } - }; + let queue = SharedQueue::get(&settings.proxy_context, false).ok_or_else(|| { + gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to create get queue"]) + })?; let dataqueue = DataQueue::new( &element.clone().upcast(), @@ -1095,7 +1073,7 @@ impl ProxySrc { gst::ResourceError::OpenRead, ["Failed to schedule data queue"] ) - })?;; + })?; let pending_future_id = io_context.acquire_pending_future_id(); gst_debug!( @@ -1332,45 +1310,38 @@ impl ElementImpl for ProxySrc { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => match self.prepare(element) { - Err(err) => { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { element.post_error_message(&err); - return gst::StateChangeReturn::Failure; - } - Ok(_) => (), - }, - gst::StateChange::PlayingToPaused => match self.stop(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - gst::StateChange::ReadyToNull => match self.unprepare(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, + gst::StateChangeError + })?; + } + gst::StateChange::PlayingToPaused => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element).map_err(|_| gst::StateChangeError)?; + } _ => (), } - let mut ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } + let mut success = self.parent_change_state(element, transition)?; match transition { gst::StateChange::ReadyToPaused => { - ret = gst::StateChangeReturn::NoPreroll; + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToPlaying => { + self.start(element).map_err(|_| gst::StateChangeError)?; } - gst::StateChange::PausedToPlaying => match self.start(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, _ => (), } - ret + Ok(success) } } diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs index 595b85f0..f0497c66 100644 --- a/gst-plugin-threadshare/src/queue.rs +++ b/gst-plugin-threadshare/src/queue.rs @@ -127,7 +127,7 @@ struct State { pending_future_id_in: Option, queue: Option, pending_queue: Option<(Option, bool, VecDeque)>, - last_ret: gst::FlowReturn, + last_res: Result, pending_future_cancel: Option>, } @@ -140,7 +140,7 @@ impl Default for State { pending_future_id_in: None, queue: None, pending_queue: None, - last_ret: gst::FlowReturn::Ok, + last_res: Ok(gst::FlowSuccess::Ok), pending_future_cancel: None, } } @@ -177,7 +177,7 @@ impl Queue { _pad: &gst::Pad, element: &gst::Element, item: DataQueueItem, - ) -> gst::FlowReturn { + ) -> Result { let wait_future = { let mut state = self.state.lock().unwrap(); let State { @@ -187,10 +187,8 @@ impl Queue { pending_future_id_in, .. } = *state; - let queue = match *queue { - None => return gst::FlowReturn::Error, - Some(ref queue) => queue, - }; + + let queue = queue.as_ref().ok_or(gst::FlowError::Error)?; let item = match pending_queue { None => queue.push(item), @@ -331,20 +329,17 @@ impl Queue { if let Some(wait_future) = wait_future { gst_log!(self.cat, obj: element, "Blocking until queue becomes empty"); - match executor::current_thread::block_on_all(wait_future) { - Err(_) => { - gst_element_error!( - element, - gst::StreamError::Failed, - ["failed to wait for queue to become empty again"] - ); - return gst::FlowReturn::Error; - } - Ok(_) => (), - } + executor::current_thread::block_on_all(wait_future).map_err(|_| { + gst_element_error!( + element, + gst::StreamError::Failed, + ["failed to wait for queue to become empty again"] + ); + gst::FlowError::Error + })?; } - self.state.lock().unwrap().last_ret + self.state.lock().unwrap().last_res } fn sink_chain( @@ -352,7 +347,7 @@ impl Queue { pad: &gst::Pad, element: &gst::Element, buffer: gst::Buffer, - ) -> gst::FlowReturn { + ) -> Result { gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); self.enqueue_item(pad, element, DataQueueItem::Buffer(buffer)) } @@ -362,7 +357,7 @@ impl Queue { pad: &gst::Pad, element: &gst::Element, list: gst::BufferList, - ) -> gst::FlowReturn { + ) -> Result { gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list); self.enqueue_item(pad, element, DataQueueItem::BufferList(list)) } @@ -378,9 +373,9 @@ impl Queue { let _ = self.stop(element); } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Paused - || ret == gst::StateChangeReturn::Async && pending == gst::State::Paused + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Paused + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Paused { let _ = self.start(element); } @@ -453,9 +448,9 @@ impl Queue { let _ = self.stop(element); } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Playing - || ret == gst::StateChangeReturn::Async && pending == gst::State::Playing + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { let _ = self.start(element); } @@ -534,11 +529,11 @@ impl Queue { let res = match item { DataQueueItem::Buffer(buffer) => { gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer); - self.src_pad.push(buffer).into_result().map(|_| ()) + self.src_pad.push(buffer).map(|_| ()) } DataQueueItem::BufferList(list) => { gst_log!(self.cat, obj: element, "Forwarding buffer list {:?}", list); - self.src_pad.push_list(list).into_result().map(|_| ()) + self.src_pad.push_list(list).map(|_| ()) } DataQueueItem::Event(event) => { gst_log!(self.cat, obj: element, "Forwarding event {:?}", event); @@ -551,7 +546,7 @@ impl Queue { Ok(_) => { gst_log!(self.cat, obj: element, "Successfully pushed item"); let mut state = self.state.lock().unwrap(); - state.last_ret = gst::FlowReturn::Ok; + state.last_res = Ok(gst::FlowSuccess::Ok); Ok(()) } Err(gst::FlowError::Flushing) => { @@ -560,7 +555,7 @@ impl Queue { if let Some(ref queue) = state.queue { queue.pause(); } - state.last_ret = gst::FlowReturn::Flushing; + state.last_res = Err(gst::FlowError::Flushing); Ok(()) } Err(gst::FlowError::Eos) => { @@ -569,7 +564,7 @@ impl Queue { if let Some(ref queue) = state.queue { queue.pause(); } - state.last_ret = gst::FlowReturn::Eos; + state.last_res = Err(gst::FlowError::Eos); Ok(()) } Err(err) => { @@ -581,7 +576,7 @@ impl Queue { ["streaming stopped, reason {}", err] ); let mut state = self.state.lock().unwrap(); - state.last_ret = gst::FlowReturn::from_error(err); + state.last_res = Err(err); Err(gst::FlowError::CustomError) } }; @@ -673,7 +668,7 @@ impl Queue { gst::ResourceError::OpenRead, ["Failed to schedule data queue"] ) - })?;; + })?; let pending_future_id = io_context.acquire_pending_future_id(); gst_debug!( @@ -729,7 +724,7 @@ impl Queue { if let Some(ref queue) = state.queue { queue.unpause(); } - state.last_ret = gst::FlowReturn::Ok; + state.last_res = Ok(gst::FlowSuccess::Ok); gst_debug!(self.cat, obj: element, "Started"); @@ -748,7 +743,7 @@ impl Queue { task.notify(); } let _ = state.pending_future_cancel.take(); - state.last_ret = gst::FlowReturn::Flushing; + state.last_res = Err(gst::FlowError::Flushing); gst_debug!(self.cat, obj: element, "Stopped"); @@ -802,14 +797,14 @@ impl ObjectSubclass for Queue { sink_pad.set_chain_function(|pad, parent, buffer| { Queue::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |queue, element| queue.sink_chain(pad, element, buffer), ) }); sink_pad.set_chain_list_function(|pad, parent, list| { Queue::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |queue, element| queue.sink_chain_list(pad, element, list), ) }); @@ -930,42 +925,32 @@ impl ElementImpl for Queue { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => match self.prepare(element) { - Err(err) => { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { element.post_error_message(&err); - return gst::StateChangeReturn::Failure; - } - Ok(_) => (), - }, - gst::StateChange::PausedToReady => match self.stop(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - gst::StateChange::ReadyToNull => match self.unprepare(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element).map_err(|_| gst::StateChangeError)?; + } _ => (), } - let ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; + let success = self.parent_change_state(element, transition)?; + + if transition == gst::StateChange::ReadyToPaused { + self.start(element).map_err(|_| gst::StateChangeError)?; } - match transition { - gst::StateChange::ReadyToPaused => match self.start(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - _ => (), - } - - ret + Ok(success) } } diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index 978699bf..7ed553d7 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -211,9 +211,9 @@ impl TcpClientSrc { true } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Playing - || ret == gst::StateChangeReturn::Async && pending == gst::State::Playing + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { let _ = self.start(element); } @@ -341,7 +341,7 @@ impl TcpClientSrc { self.src_pad.push_event(event); } - let res = match self.src_pad.push(buffer).into_result() { + let res = match self.src_pad.push(buffer) { Ok(_) => { gst_log!(self.cat, obj: element, "Successfully pushed buffer"); Ok(()) @@ -706,38 +706,31 @@ impl ElementImpl for TcpClientSrc { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => match self.prepare(element) { - Err(err) => { - element.post_error_message(&err); - return gst::StateChangeReturn::Failure; - } - Ok(_) => match self.start(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - }, - gst::StateChange::PlayingToPaused => match self.stop(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => match self.unprepare(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - }, + gst::StateChange::NullToReady => { + self.prepare(element) + .map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + }) + .and_then(|_| self.start(element).map_err(|_| gst::StateChangeError))?; + } + gst::StateChange::PlayingToPaused => { + self.stop(element) + .and_then(|_| self.unprepare(element)) + .map_err(|_| gst::StateChangeError)?; + } _ => (), } - let mut ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } + let mut success = self.parent_change_state(element, transition)?; match transition { gst::StateChange::ReadyToPaused => { - ret = gst::StateChangeReturn::Success; + success = gst::StateChangeSuccess::Success; } gst::StateChange::PausedToReady => { let mut state = self.state.lock().unwrap(); @@ -746,7 +739,7 @@ impl ElementImpl for TcpClientSrc { _ => (), } - ret + Ok(success) } } diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 2456ffc7..352e5485 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -328,9 +328,9 @@ impl UdpSrc { true } EventView::FlushStop(..) => { - let (ret, state, pending) = element.get_state(0.into()); - if ret == gst::StateChangeReturn::Success && state == gst::State::Playing - || ret == gst::StateChangeReturn::Async && pending == gst::State::Playing + let (res, state, pending) = element.get_state(0.into()); + if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing + || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing { let _ = self.start(element); } @@ -453,7 +453,7 @@ impl UdpSrc { self.src_pad.push_event(event); } - let res = match self.src_pad.push(buffer).into_result() { + let res = match self.src_pad.push(buffer) { Ok(_) => { gst_log!(self.cat, obj: element, "Successfully pushed buffer"); Ok(()) @@ -1024,41 +1024,34 @@ impl ElementImpl for UdpSrc { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { - gst::StateChange::NullToReady => match self.prepare(element) { - Err(err) => { + gst::StateChange::NullToReady => { + self.prepare(element).map_err(|err| { element.post_error_message(&err); - return gst::StateChangeReturn::Failure; - } - Ok(_) => (), - }, - gst::StateChange::PlayingToPaused => match self.stop(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, - gst::StateChange::ReadyToNull => match self.unprepare(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, + gst::StateChangeError + })?; + } + gst::StateChange::PlayingToPaused => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + self.unprepare(element).map_err(|_| gst::StateChangeError)?; + } _ => (), } - let mut ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } + let mut success = self.parent_change_state(element, transition)?; match transition { gst::StateChange::ReadyToPaused => { - ret = gst::StateChangeReturn::NoPreroll; + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PausedToPlaying => { + self.start(element).map_err(|_| gst::StateChangeError)?; } - gst::StateChange::PausedToPlaying => match self.start(element) { - Err(_) => return gst::StateChangeReturn::Failure, - Ok(_) => (), - }, gst::StateChange::PausedToReady => { let mut state = self.state.lock().unwrap(); state.need_initial_events = true; @@ -1066,7 +1059,7 @@ impl ElementImpl for UdpSrc { _ => (), } - ret + Ok(success) } } diff --git a/gst-plugin-threadshare/tests/proxy.rs b/gst-plugin-threadshare/tests/proxy.rs index 6ebf1738..4cc88cea 100644 --- a/gst-plugin-threadshare/tests/proxy.rs +++ b/gst-plugin-threadshare/tests/proxy.rs @@ -21,6 +21,8 @@ use glib::prelude::*; extern crate gstreamer as gst; use gst::prelude::*; +extern crate gstreamer_app as gst_app; + use std::sync::{Arc, Mutex}; extern crate gstthreadshare; @@ -59,28 +61,22 @@ fn test_push() { let samples = Arc::new(Mutex::new(Vec::new())); + let appsink = appsink.dynamic_cast::().unwrap(); let samples_clone = samples.clone(); - appsink - .connect("new-sample", true, move |args| { - let appsink = args[0].get::().unwrap(); + appsink.connect_new_sample(move |appsink| { + let sample = appsink + .emit("pull-sample", &[]) + .unwrap() + .unwrap() + .get::() + .unwrap(); - let sample = appsink - .emit("pull-sample", &[]) - .unwrap() - .unwrap() - .get::() - .unwrap(); + samples_clone.lock().unwrap().push(sample); - samples_clone.lock().unwrap().push(sample); + Ok(gst::FlowSuccess::Ok) + }); - Some(gst::FlowReturn::Ok.to_value()) - }) - .unwrap(); - - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); let mut eos = false; let bus = pipeline.get_bus().unwrap(); @@ -104,5 +100,5 @@ fn test_push() { assert!(sample.get_buffer().is_some()); } - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } diff --git a/gst-plugin-threadshare/tests/queue.rs b/gst-plugin-threadshare/tests/queue.rs index cfd86f81..5046f1a2 100644 --- a/gst-plugin-threadshare/tests/queue.rs +++ b/gst-plugin-threadshare/tests/queue.rs @@ -21,6 +21,8 @@ use glib::prelude::*; extern crate gstreamer as gst; use gst::prelude::*; +extern crate gstreamer_app as gst_app; + use std::sync::{Arc, Mutex}; extern crate gstthreadshare; @@ -54,28 +56,22 @@ fn test_push() { let samples = Arc::new(Mutex::new(Vec::new())); + let appsink = appsink.dynamic_cast::().unwrap(); let samples_clone = samples.clone(); - appsink - .connect("new-sample", true, move |args| { - let appsink = args[0].get::().unwrap(); + appsink.connect_new_sample(move |appsink| { + let sample = appsink + .emit("pull-sample", &[]) + .unwrap() + .unwrap() + .get::() + .unwrap(); - let sample = appsink - .emit("pull-sample", &[]) - .unwrap() - .unwrap() - .get::() - .unwrap(); + samples_clone.lock().unwrap().push(sample); - samples_clone.lock().unwrap().push(sample); + Ok(gst::FlowSuccess::Ok) + }); - Some(gst::FlowReturn::Ok.to_value()) - }) - .unwrap(); - - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); let mut eos = false; let bus = pipeline.get_bus().unwrap(); @@ -99,5 +95,5 @@ fn test_push() { assert!(sample.get_buffer().is_some()); } - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } diff --git a/gst-plugin-threadshare/tests/tcpclientsrc.rs b/gst-plugin-threadshare/tests/tcpclientsrc.rs index aee442b0..ab3bcb68 100644 --- a/gst-plugin-threadshare/tests/tcpclientsrc.rs +++ b/gst-plugin-threadshare/tests/tcpclientsrc.rs @@ -22,6 +22,8 @@ use glib::prelude::*; extern crate gstreamer as gst; use gst::prelude::*; +extern crate gstreamer_app as gst_app; + use std::io::Write; use std::sync::{Arc, Mutex}; use std::{thread, time}; @@ -75,28 +77,22 @@ fn test_push() { let samples = Arc::new(Mutex::new(Vec::new())); + let appsink = appsink.dynamic_cast::().unwrap(); let samples_clone = samples.clone(); - appsink - .connect("new-sample", true, move |args| { - let appsink = args[0].get::().unwrap(); + appsink.connect_new_sample(move |appsink| { + let sample = appsink + .emit("pull-sample", &[]) + .unwrap() + .unwrap() + .get::() + .unwrap(); - let sample = appsink - .emit("pull-sample", &[]) - .unwrap() - .unwrap() - .get::() - .unwrap(); + let mut samples = samples_clone.lock().unwrap(); + samples.push(sample); + Ok(gst::FlowSuccess::Ok) + }); - let mut samples = samples_clone.lock().unwrap(); - samples.push(sample); - Some(gst::FlowReturn::Ok.to_value()) - }) - .unwrap(); - - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); let mut eos = false; let bus = pipeline.get_bus().unwrap(); @@ -123,7 +119,7 @@ fn test_push() { }); assert_eq!(total_received_size, 3 * 160); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); handler.join().unwrap(); } diff --git a/gst-plugin-threadshare/tests/udpsrc.rs b/gst-plugin-threadshare/tests/udpsrc.rs index 70eeb963..2ccdd84b 100644 --- a/gst-plugin-threadshare/tests/udpsrc.rs +++ b/gst-plugin-threadshare/tests/udpsrc.rs @@ -164,7 +164,7 @@ fn test_socket_reuse() { for _ in 0..3 { let buffer = ts_src_h.pull().unwrap(); - sink_h.push(buffer).into_result().unwrap(); + sink_h.push(buffer).unwrap(); let buffer = ts_src_h2.pull().unwrap(); assert_eq!(buffer.get_size(), 160); diff --git a/gst-plugin-togglerecord/Cargo.toml b/gst-plugin-togglerecord/Cargo.toml index c1c6873e..f4628704 100644 --- a/gst-plugin-togglerecord/Cargo.toml +++ b/gst-plugin-togglerecord/Cargo.toml @@ -24,4 +24,3 @@ path = "src/lib.rs" name = "gtk-recording" path = "examples/gtk_recording.rs" required-features = ["gtk", "gio"] - diff --git a/gst-plugin-togglerecord/examples/gtk_recording.rs b/gst-plugin-togglerecord/examples/gtk_recording.rs index b6d56e6c..336f0705 100644 --- a/gst-plugin-togglerecord/examples/gtk_recording.rs +++ b/gst-plugin-togglerecord/examples/gtk_recording.rs @@ -318,15 +318,13 @@ fn create_ui(app: >k::Application) { glib::Continue(true) }); - let ret = pipeline.set_state(gst::State::Playing); - assert_ne!(ret, gst::StateChangeReturn::Failure); + pipeline.set_state(gst::State::Playing).unwrap(); // Pipeline reference is owned by the closure below, so will be // destroyed once the app is destroyed let timeout_id = RefCell::new(Some(timeout_id)); app.connect_shutdown(move |_| { - let ret = pipeline.set_state(gst::State::Null); - assert_ne!(ret, gst::StateChangeReturn::Failure); + pipeline.set_state(gst::State::Null).unwrap(); bus.remove_watch(); diff --git a/gst-plugin-togglerecord/src/togglerecord.rs b/gst-plugin-togglerecord/src/togglerecord.rs index 304d4773..70aff3cf 100644 --- a/gst-plugin-togglerecord/src/togglerecord.rs +++ b/gst-plugin-togglerecord/src/togglerecord.rs @@ -187,7 +187,7 @@ impl ToggleRecord { sinkpad.set_chain_function(|pad, parent, buffer| { ToggleRecord::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |togglerecord, element| togglerecord.sink_chain(pad, element, buffer), ) }); @@ -651,25 +651,27 @@ impl ToggleRecord { pad: &gst::Pad, element: &gst::Element, buffer: gst::Buffer, - ) -> gst::FlowReturn { - let stream = match self.pads.lock().get(pad) { - None => { + ) -> Result { + let stream = self + .pads + .lock() + .get(pad) + .map(|stream| stream.clone()) + .ok_or_else(|| { gst_element_error!( element, gst::CoreError::Pad, ["Unknown pad {:?}", pad.get_name()] ); - return gst::FlowReturn::Error; - } - Some(stream) => stream.clone(), - }; + gst::FlowError::Error + })?; gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); { let state = stream.state.lock(); if state.eos { - return gst::FlowReturn::Eos; + return Err(gst::FlowError::Eos); } } @@ -682,19 +684,19 @@ impl ToggleRecord { gst::StreamError::Format, ["DTS != PTS not supported for secondary streams"] ); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } - if !pts.is_some() { + pts.ok_or_else(|| { gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]); - return gst::FlowReturn::Error; - } + gst::FlowError::Error + })?; if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) { gst_element_error!( element, gst::StreamError::Format, ["Delta-units not supported for secondary streams"] ); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } self.handle_secondary_stream(pad, &stream, pts, buffer.get_duration()) @@ -706,7 +708,7 @@ impl ToggleRecord { gst::StreamError::Format, ["Buffer without DTS or PTS"] ); - return gst::FlowReturn::Error; + return Err(gst::FlowError::Error); } self.handle_main_stream( @@ -721,10 +723,10 @@ impl ToggleRecord { match handle_result { HandleResult::Drop => { - return gst::FlowReturn::Ok; + return Ok(gst::FlowSuccess::Ok); } HandleResult::Flushing => { - return gst::FlowReturn::Flushing; + return Err(gst::FlowError::Flushing); } HandleResult::Eos => { stream.srcpad.push_event( @@ -732,7 +734,7 @@ impl ToggleRecord { .seqnum(stream.state.lock().segment_seqnum) .build(), ); - return gst::FlowReturn::Eos; + return Err(gst::FlowError::Eos); } HandleResult::Pass => { // Pass through and actually push the buffer @@ -1271,7 +1273,7 @@ impl ElementImpl for ToggleRecord { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); match transition { @@ -1303,34 +1305,28 @@ impl ElementImpl for ToggleRecord { _ => (), } - let ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } + let success = self.parent_change_state(element, transition)?; - match transition { - gst::StateChange::PausedToReady => { - for s in self - .other_streams - .lock() - .0 - .iter() - .chain(iter::once(&self.main_stream)) - { - let mut state = s.state.lock(); + if transition == gst::StateChange::PausedToReady { + for s in self + .other_streams + .lock() + .0 + .iter() + .chain(iter::once(&self.main_stream)) + { + let mut state = s.state.lock(); - state.pending_events.clear(); - } - - let mut rec_state = self.state.lock(); - *rec_state = State::default(); - drop(rec_state); - element.notify("recording"); + state.pending_events.clear(); } - _ => (), + + let mut rec_state = self.state.lock(); + *rec_state = State::default(); + drop(rec_state); + element.notify("recording"); } - ret + Ok(success) } fn request_new_pad( diff --git a/gst-plugin-togglerecord/tests/tests.rs b/gst-plugin-togglerecord/tests/tests.rs index a00994a9..f60e417b 100644 --- a/gst-plugin-togglerecord/tests/tests.rs +++ b/gst-plugin-togglerecord/tests/tests.rs @@ -73,7 +73,7 @@ fn setup_sender_receiver( }; let fakesink_sinkpad = fakesink.get_static_pad("sink").unwrap(); - srcpad.link(&fakesink_sinkpad).into_result().unwrap(); + srcpad.link(&fakesink_sinkpad).unwrap(); let (sender_output, receiver_output) = mpsc::channel::>(); let sender_output = Mutex::new(sender_output); @@ -254,10 +254,7 @@ fn test_one_stream_open() { let (sender_input, _, receiver_output, thread) = setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); sender_input.send(SendData::Buffers(10)).unwrap(); @@ -274,7 +271,7 @@ fn test_one_stream_open() { thread.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -288,10 +285,7 @@ fn test_one_stream_gaps_open() { let (sender_input, _, receiver_output, thread) = setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); sender_input.send(SendData::Buffers(5)).unwrap(); @@ -309,7 +303,7 @@ fn test_one_stream_gaps_open() { thread.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -323,10 +317,7 @@ fn test_one_stream_close_open() { let (sender_input, receiver_input_done, receiver_output, thread) = setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); sender_input.send(SendData::Buffers(10)).unwrap(); receiver_input_done.recv().unwrap(); @@ -345,7 +336,7 @@ fn test_one_stream_close_open() { thread.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -359,10 +350,7 @@ fn test_one_stream_open_close() { let (sender_input, receiver_input_done, receiver_output, thread) = setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); sender_input.send(SendData::Buffers(10)).unwrap(); @@ -382,7 +370,7 @@ fn test_one_stream_open_close() { thread.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -396,10 +384,7 @@ fn test_one_stream_open_close_open() { let (sender_input, receiver_input_done, receiver_output, thread) = setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); sender_input.send(SendData::Buffers(10)).unwrap(); @@ -428,7 +413,7 @@ fn test_one_stream_open_close_open() { thread.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -444,10 +429,7 @@ fn test_two_stream_open() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -482,7 +464,7 @@ fn test_two_stream_open() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -498,10 +480,7 @@ fn test_two_stream_open_shift() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -536,7 +515,7 @@ fn test_two_stream_open_shift() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -552,10 +531,7 @@ fn test_two_stream_open_shift_main() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -591,7 +567,7 @@ fn test_two_stream_open_shift_main() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -607,10 +583,7 @@ fn test_two_stream_open_close() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -661,7 +634,7 @@ fn test_two_stream_open_close() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -677,10 +650,7 @@ fn test_two_stream_close_open() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &false).unwrap(); @@ -731,7 +701,7 @@ fn test_two_stream_close_open() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -747,10 +717,7 @@ fn test_two_stream_open_close_open() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -826,7 +793,7 @@ fn test_two_stream_open_close_open() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -842,10 +809,7 @@ fn test_two_stream_open_close_open_gaps() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -927,7 +891,7 @@ fn test_two_stream_open_close_open_gaps() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -943,10 +907,7 @@ fn test_two_stream_close_open_close_delta() { let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &false).unwrap(); @@ -1017,7 +978,7 @@ fn test_two_stream_close_open_close_delta() { thread_1.join().unwrap(); thread_2.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] @@ -1035,10 +996,7 @@ fn test_three_stream_open_close_open() { let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); - pipeline - .set_state(gst::State::Playing) - .into_result() - .unwrap(); + pipeline.set_state(gst::State::Playing).unwrap(); togglerecord.set_property("record", &true).unwrap(); @@ -1142,5 +1100,5 @@ fn test_three_stream_open_close_open() { thread_2.join().unwrap(); thread_3.join().unwrap(); - pipeline.set_state(gst::State::Null).into_result().unwrap(); + pipeline.set_state(gst::State::Null).unwrap(); } diff --git a/gst-plugin-tutorial/src/identity.rs b/gst-plugin-tutorial/src/identity.rs index db4163e4..4e86d46e 100644 --- a/gst-plugin-tutorial/src/identity.rs +++ b/gst-plugin-tutorial/src/identity.rs @@ -35,7 +35,7 @@ impl Identity { sinkpad.set_chain_function(|pad, parent, buffer| { Identity::catch_panic_pad_function( parent, - || gst::FlowReturn::Error, + || Err(gst::FlowError::Error), |identity, element| identity.sink_chain(pad, element, buffer), ) }); @@ -81,7 +81,7 @@ impl Identity { pad: &gst::Pad, _element: &gst::Element, buffer: gst::Buffer, - ) -> gst::FlowReturn { + ) -> Result { gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); self.srcpad.push(buffer) } @@ -260,16 +260,11 @@ impl ElementImpl for Identity { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); // Call the parent class' implementation of ::change_state() - let ret = self.parent_change_state(element, transition); - if ret == gst::StateChangeReturn::Failure { - return ret; - } - - ret + self.parent_change_state(element, transition) } } diff --git a/gst-plugin-tutorial/src/rgb2gray.rs b/gst-plugin-tutorial/src/rgb2gray.rs index 4c4e7a27..7f102911 100644 --- a/gst-plugin-tutorial/src/rgb2gray.rs +++ b/gst-plugin-tutorial/src/rgb2gray.rs @@ -416,7 +416,7 @@ impl BaseTransformImpl for Rgb2Gray { element: &gst_base::BaseTransform, inbuf: &gst::Buffer, outbuf: &mut gst::BufferRef, - ) -> gst::FlowReturn { + ) -> Result { // Keep a local copy of the values of all our properties at this very moment. This // ensures that the mutex is never locked for long and the application wouldn't // have to block until this function returns when getting/setting property values @@ -424,13 +424,10 @@ impl BaseTransformImpl for Rgb2Gray { // Get a locked reference to our state, i.e. the input and output VideoInfo let mut state_guard = self.state.lock().unwrap(); - let state = match *state_guard { - None => { - gst_element_error!(element, gst::CoreError::Negotiation, ["Have no state yet"]); - return gst::FlowReturn::NotNegotiated; - } - Some(ref mut state) => state, - }; + let state = state_guard.as_mut().ok_or_else(|| { + gst_element_error!(element, gst::CoreError::Negotiation, ["Have no state yet"]); + gst::FlowError::NotNegotiated + })?; // Map the input buffer as a VideoFrameRef. This is similar to directly mapping // the buffer with inbuf.map_readable() but in addition extracts various video @@ -440,34 +437,28 @@ impl BaseTransformImpl for Rgb2Gray { // // This fails if the buffer can't be read or is invalid in relation to the video // info that is passed here - let in_frame = match gst_video::VideoFrameRef::from_buffer_ref_readable( - inbuf.as_ref(), - &state.in_info, - ) { - None => { - gst_element_error!( - element, - gst::CoreError::Failed, - ["Failed to map input buffer readable"] - ); - return gst::FlowReturn::Error; - } - Some(in_frame) => in_frame, - }; + let in_frame = + gst_video::VideoFrameRef::from_buffer_ref_readable(inbuf.as_ref(), &state.in_info) + .ok_or_else(|| { + gst_element_error!( + element, + gst::CoreError::Failed, + ["Failed to map input buffer readable"] + ); + gst::FlowError::Error + })?; // And now map the output buffer writable, so we can fill it. let mut out_frame = - match gst_video::VideoFrameRef::from_buffer_ref_writable(outbuf, &state.out_info) { - None => { + gst_video::VideoFrameRef::from_buffer_ref_writable(outbuf, &state.out_info) + .ok_or_else(|| { gst_element_error!( element, gst::CoreError::Failed, ["Failed to map output buffer writable"] ); - return gst::FlowReturn::Error; - } - Some(out_frame) => out_frame, - }; + gst::FlowError::Error + })?; // Keep the various metadata we need for working with the video frames in // local variables. This saves some typing below. @@ -566,7 +557,7 @@ impl BaseTransformImpl for Rgb2Gray { unimplemented!(); } - gst::FlowReturn::Ok + Ok(gst::FlowSuccess::Ok) } } diff --git a/gst-plugin-tutorial/src/sinesrc.rs b/gst-plugin-tutorial/src/sinesrc.rs index 778f1b05..a819c5bb 100644 --- a/gst-plugin-tutorial/src/sinesrc.rs +++ b/gst-plugin-tutorial/src/sinesrc.rs @@ -409,7 +409,7 @@ impl ElementImpl for SineSrc { &self, element: &gst::Element, transition: gst::StateChange, - ) -> gst::StateChangeReturn { + ) -> Result { let basesrc = element.downcast_ref::().unwrap(); // Configure live'ness once here just before starting the source @@ -685,7 +685,7 @@ impl BaseSrcImpl for SineSrc { // If the clock ID was unscheduled, unlock() was called // and we should return Flushing immediately. - if res == gst::ClockReturn::Unscheduled { + if res == Err(gst::ClockError::Unscheduled) { gst_debug!(self.cat, obj: element, "Flushing"); return Err(gst::FlowError::Flushing); }