From 1c47285c9c07f59af9993e120b3ef10f5868be1f Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Sat, 13 Mar 2021 16:15:28 +0100 Subject: [PATCH] Initial version --- Cargo.toml | 6 ++ README.md | 28 +++++++ src/file.rs | 39 +++++++++ src/lib.rs | 11 +-- src/video_stream.rs | 195 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 272 insertions(+), 7 deletions(-) create mode 100644 README.md create mode 100644 src/file.rs create mode 100644 src/video_stream.rs diff --git a/Cargo.toml b/Cargo.toml index 4d2c5e7..095ad0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,13 @@ name = "vid2img" version = "0.1.0" authors = ["Rafael Caricio "] edition = "2018" +license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +glib = "0.10.1" +gstreamer = "0.16.2" +gstreamer-app = "0.16.0" +gstreamer-video = "0.16.0" +log = "0.4" diff --git a/README.md b/README.md new file mode 100644 index 0000000..4515d2f --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# Vid2Img - Video to Image + +Vid2img is a Rust crate that allows the use of a video file as a collection of frame images. This crate exposes +a `FileSource` type that accepts a video file path and the desired size of the frames, then you can convert the +instance into a iterator (`.into_iter()`). On every iteration you will receive a video frame data encoded as PNG. + + +```rust +use std::path::Path; +use vid2img::FileSource; + +fn main() { + let file_path = Path::new("video.mp4"); + + let frame_source = FileSource::new(file_path, (200, 200)).unwrap(); + for frame in frame_source.into_iter() { + if let Ok(Some(png_img_data)) = frame { + // do something with the image data here ... + } + } +} +``` + +We use [GStreamer](https://gstreamer.freedesktop.org/) for processing the video and capturing the frames. We make use +of the official [Rust wrapper](https://gitlab.freedesktop.org/gstreamer/gstreamer-rs) to the GStreamer API. + +## Installation +As we use GStreamer, the [installation steps](https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#installation) for the GStreamer-rs crate must be followed. diff --git a/src/file.rs b/src/file.rs new file mode 100644 index 0000000..1ce770b --- /dev/null +++ b/src/file.rs @@ -0,0 +1,39 @@ +use crate::StreamError; +use crate::{FrameData, VideoStream, VideoStreamIterator}; +use std::path::{Path, PathBuf}; + +pub struct FileSource { + source: PathBuf, + frame_size: (u32, u32), +} + +impl FileSource { + pub fn new(source: &Path, frame_size: (u32, u32)) -> Result { + if !source.exists() { + return Err(CaptureError::FileNotFound); + } + Ok(Self { + source: source.to_path_buf(), + frame_size, + }) + } +} + +impl IntoIterator for FileSource { + type Item = Result, StreamError>; + type IntoIter = VideoStreamIterator; + + fn into_iter(self) -> Self::IntoIter { + let pipeline_description = format!( + "uridecodebin uri=file://{} ! videoconvert ! videoscale ! capsfilter caps=\"video/x-raw, width={}, height={}\"", + self.source.to_string_lossy(), + self.frame_size.0, + self.frame_size.1 + ); + VideoStream::new(pipeline_description).into_iter() + } +} + +pub enum CaptureError { + FileNotFound, +} diff --git a/src/lib.rs b/src/lib.rs index 31e1bb2..438839f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,4 @@ -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} +pub mod file; +mod video_stream; + +pub use video_stream::*; diff --git a/src/video_stream.rs b/src/video_stream.rs new file mode 100644 index 0000000..0a4d5e5 --- /dev/null +++ b/src/video_stream.rs @@ -0,0 +1,195 @@ +use gst::gst_element_error; +use gst::prelude::*; +use gstreamer as gst; +use gstreamer_app as gst_app; +use std::sync::mpsc::{sync_channel, Receiver, TryRecvError, TrySendError}; + +pub type FrameData = Vec; + +pub struct VideoStream { + pipeline_description: String, +} + +impl VideoStream { + pub fn new>(pipeline_description: S) -> Self { + Self { + pipeline_description: String::from(pipeline_description.as_ref()), + } + } +} + +pub struct GstErrorMessage { + pub src: String, + pub error: String, + pub debug: Option, + pub source: glib::Error, +} + +pub enum StreamError { + GstError(GstErrorMessage), + FrameCaptureError, +} + +impl IntoIterator for VideoStream { + type Item = Result, StreamError>; + type IntoIter = VideoStreamIterator; + + fn into_iter(self) -> Self::IntoIter { + let (sender, receiver) = sync_channel(1); + + log::debug!("Creating GStreamer Pipeline.."); + let pipeline = gst::parse_launch( + format!( + "{} ! pngenc snapshot=false ! appsink name=sink", + self.pipeline_description + ) + .as_str(), + ) + .expect("Pipeline description invalid, cannot create") + .downcast::() + .expect("Expected a gst::Pipeline"); + + // Get access to the appsink element. + let appsink = pipeline + .get_by_name("sink") + .expect("Sink element not found") + .downcast::() + .expect("Sink element is expected to be an appsink!"); + + appsink + .set_property("sync", &false) + .expect("Failed to disable gst pipeline sync"); + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |appsink| { + // Pull the sample in question out of the appsink's buffer. + let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer_ref = sample.get_buffer().ok_or_else(|| { + gst_element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to get buffer from appsink") + ); + + if let Err(err) = sender.try_send(Err(StreamError::FrameCaptureError)) { + log::error!("Could not send message in stream: {}", err) + } + + gst::FlowError::Error + })?; + + // At this point, buffer is only a reference to an existing memory region somewhere. + // When we want to access its content, we have to map it while requesting the required + // mode of access (read, read/write). + // This type of abstraction is necessary, because the buffer in question might not be + // on the machine's main memory itself, but rather in the GPU's memory. + // So mapping the buffer makes the underlying memory region accessible to us. + // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html + let buffer = buffer_ref.map_readable().map_err(|_| { + gst_element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to map buffer readable") + ); + + if let Err(err) = sender.try_send(Err(StreamError::FrameCaptureError)) { + log::error!("Could not send message in stream: {}", err) + } + + gst::FlowError::Error + })?; + log::trace!("Frame extracted from pipeline"); + + match sender.try_send(Ok(Some(buffer.to_vec()))) { + Ok(_) => Ok(gst::FlowSuccess::Ok), + Err(TrySendError::Full(_)) => { + log::trace!("Channel is full, discarded frame"); + Ok(gst::FlowSuccess::Ok) + } + Err(TrySendError::Disconnected(_)) => { + log::debug!("Returning EOS in pipeline callback fn"); + Err(gst::FlowError::Eos) + } + } + }) + .build(), + ); + + let bus = pipeline + .get_bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + pipeline + .set_state(gst::State::Playing) + .expect("Cannot start pipeline"); + log::info!("Pipeline started: {}", self.pipeline_description); + + VideoStreamIterator { + description: self.pipeline_description, + receiver, + pipeline, + bus, + } + } +} + +pub struct VideoStreamIterator { + description: String, + receiver: Receiver, StreamError>>, + pipeline: gst::Pipeline, + bus: gst::Bus, +} + +impl Iterator for VideoStreamIterator { + type Item = Result, StreamError>; + + fn next(&mut self) -> Option { + match self.receiver.try_recv() { + Ok(event) => return Some(event), + Err(TryRecvError::Empty) => { + // Check if there are errors in the GStreamer pipeline itself. + if let Some(msg) = self.bus.pop() { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + // The End-of-stream message is posted when the stream is done, which in our case + // happens immediately after matching the slate image because we return + // gst::FlowError::Eos then. + return None; + } + MessageView::Error(err) => { + let error_msg = GstErrorMessage { + src: msg + .get_src() + .map(|s| String::from(s.get_path_string())) + .unwrap_or_else(|| String::from("None")), + error: err.get_error().to_string(), + debug: err.get_debug(), + source: err.get_error(), + }; + return Some(Err(StreamError::GstError(error_msg))); + } + _ => (), + } + } + } + Err(TryRecvError::Disconnected) => { + log::debug!("The Pipeline channel is disconnected: {}", self.description); + return None; + } + } + // Nothing to report in this iteration. + // Frames could not be captured, but there are no errors in the pipeline. + Some(Ok(None)) + } +} + +impl Drop for VideoStreamIterator { + fn drop(&mut self) { + if self.pipeline.set_state(gst::State::Null).is_err() { + log::error!("Could not stop pipeline"); + } + log::debug!("Pipeline stopped!"); + } +}