diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3959b9123..8ad2a028b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -23,19 +23,23 @@ ges = { package = "gstreamer-editing-services", path = "../gstreamer-editing-ser gst-sdp = { package = "gstreamer-sdp", path = "../gstreamer-sdp", optional = true } gst-rtsp = { package = "gstreamer-rtsp", path = "../gstreamer-rtsp", optional = true } gst-rtsp-server = { package = "gstreamer-rtsp-server", path = "../gstreamer-rtsp-server", optional = true } +gst-allocators = { package = "gstreamer-allocators", path = "../gstreamer-allocators", optional = true } gtk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true } gdk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true } -gio = { git = "https://github.com/gtk-rs/gtk-rs-core" , optional = true } +gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } anyhow = "1.0" derive_more = "0.99.5" futures = "0.3" byte-slice-cast = "1" -cairo-rs = { git = "https://github.com/gtk-rs/gtk-rs-core" , features=["use_glib"], optional = true } -pango = { git = "https://github.com/gtk-rs/gtk-rs-core" , optional = true } -pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core" , optional = true } +cairo-rs = { git = "https://github.com/gtk-rs/gtk-rs-core", features=["use_glib"], optional = true } +pango = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } +pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true } glutin = { version = "0.28", optional = true } once_cell = "1.0" -image = { version="0.24", optional = true } +image = { version = "0.24", optional = true } +memmap2 = { version = "0.5", optional = true } +memfd = { version = "0.4", optional = true } +uds = { version = "0.2", optional = true } [target.'cfg(target_os = "macos")'.dependencies] cocoa = "0.24" @@ -56,6 +60,7 @@ v1_14 = ["gst/v1_14"] pango-cairo = ["pango", "pangocairo", "cairo-rs"] overlay-composition = ["pango", "pangocairo", "cairo-rs"] gl = ["gst-gl", "gl_generator", "glutin"] +allocators = ["gst-allocators", "memmap2", "memfd", "uds"] [[bin]] name = "appsink" @@ -178,3 +183,7 @@ name = "video_converter" [[bin]] name = "thumbnail" required-features = ["image"] + +[[bin]] +name = "fd_allocator" +required-features = ["allocators"] \ No newline at end of file diff --git a/examples/src/bin/fd_allocator.rs b/examples/src/bin/fd_allocator.rs new file mode 100644 index 000000000..88ef1a513 --- /dev/null +++ b/examples/src/bin/fd_allocator.rs @@ -0,0 +1,518 @@ +// This example demonstrates the use of the FdMemory allocator. +// It operates the following two pipelines: + +// sender: {videotestsrc} - {appsink} +// receiver: {appsrc} - {FdMemoryVideoFilter} - {videoconvert} - {queue} - {autovideosink} + +// The sender creates shared memory files from the appsink which are sent +// to the receiver using a unix domain socket. +// The receiver creates buffers in the appsrc using the FdMemoryAllocator from +// the received file descriptors. + +// Additional to demonstrating how the FdMemoryAllocator can be used to share +// file descriptors the example implements a custom VideoFilter demonstrating +// how the file descriptor of FdMemory can be accessed in a pipeline. +// Note that instead of manual mapping the file descriptor it is also possible +// to use map_writable, which will also map the file descriptor. +use futures::StreamExt; +use gst::{element_error, prelude::*}; + +use anyhow::Error; +use derive_more::{Display, Error}; + +use memmap2::MmapMut; +use uds::UnixStreamExt; + +use std::{ + os::unix::{net::UnixStream, prelude::AsRawFd}, + sync::{Arc, Mutex}, +}; + +#[path = "../examples-common.rs"] +mod examples_common; + +#[derive(Debug, Display, Error)] +#[display(fmt = "Missing element {}", _0)] +struct MissingElement(#[error(not(source))] &'static str); + +#[derive(Debug, Display, Error)] +#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)] +struct ErrorMessage { + src: String, + error: String, + debug: Option, + source: glib::Error, +} + +fn create_receiver_pipeline( + video_info: &gst_video::VideoInfo, + receiver: UnixStream, +) -> Result { + let caps = video_info.to_caps()?; + + let pipeline = gst::Pipeline::new(None); + let src = gst::ElementFactory::make("appsrc", None).map_err(|_| MissingElement("appsrc"))?; + let filter = video_filter::FdMemoryFadeInVideoFilter::new()?.upcast::(); + let convert = gst::ElementFactory::make("videoconvert", None) + .map_err(|_| MissingElement("videoconvert"))?; + let queue = gst::ElementFactory::make("queue", None).map_err(|_| MissingElement("queue"))?; + let sink = gst::ElementFactory::make("autovideosink", None) + .map_err(|_| MissingElement("autovideosink"))?; + + src.downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("is not a appsrc"))? + .set_caps(Some(&caps)); + + pipeline.add_many(&[&src, &filter, &convert, &queue, &sink])?; + gst::Element::link_many(&[&src, &filter, &convert, &queue, &sink])?; + + let appsrc = src + .downcast::() + .map_err(|_| anyhow::anyhow!("is not a appsrc"))?; + + appsrc.set_do_timestamp(true); + appsrc.set_is_live(true); + + let fd_allocator = gst_allocators::FdAllocator::new(); + let video_info = video_info.clone(); + let mut fd_buf = [-1; 253]; + + appsrc.set_callbacks( + gst_app::AppSrcCallbacks::builder() + .need_data(move |appsrc, _| { + // Read the next fds from the socket, if 0 + // is returned the sender has closed the stream + // which is handled as EOS here. + let fds = match receiver.recv_fds(&mut [0u8; 1], &mut fd_buf) { + Ok((_, 0)) => { + let _ = appsrc.end_of_stream(); + return; + } + Ok((_, fds)) => fds, + Err(err) => { + gst::error_msg!( + gst::StreamError::Failed, + ("failed to receive fds: {}", err) + ); + return; + } + }; + + for fd in &fd_buf[0..fds] { + // Allocate a new FdMemory for the received file descriptor. + // It is important that the size matches the size of the + // actual backing storage. In this example we just use the + // same video info in both sides, sending and receiving. + // Pass FdMemoryFlags::NONE to make the FdMemory take + // ownership of the passed file descriptor. The file descriptor + // will be closed when the memory is released. + let memory = unsafe { + fd_allocator.alloc( + *fd, + video_info.size(), + gst_allocators::FdMemoryFlags::NONE, + ) + }; + let mut buffer = gst::Buffer::new(); + let buffer_mut = buffer.make_mut(); + buffer_mut.append_memory(memory); + let _ = appsrc.push_buffer(buffer); + } + }) + .build(), + ); + + Ok(pipeline) +} + +fn create_sender_pipeline( + video_info: &gst_video::VideoInfo, + sender: UnixStream, +) -> Result { + let sender = Arc::new(Mutex::new(sender)); + let caps = video_info.to_caps()?; + + let pipeline = gst::Pipeline::new(None); + let src = gst::ElementFactory::make("videotestsrc", None) + .map_err(|_| MissingElement("videotestsrc"))?; + let sink = gst::ElementFactory::make("appsink", None).map_err(|_| MissingElement("appsink"))?; + + src.set_property("num-buffers", 250i32); + + sink.downcast_ref::() + .ok_or_else(|| anyhow::anyhow!("is not a appsink"))? + .set_caps(Some(&caps)); + + pipeline.add_many(&[&src, &sink])?; + gst::Element::link_many(&[&src, &sink])?; + + let appsink = sink + .downcast::() + .map_err(|_| anyhow::anyhow!("is not a appsink"))?; + + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + // Add a handler to the "eos" signal + .eos({ + let sender = sender.clone(); + move |_| { + // Close the sender part of the UnixSocket pair, this will automatically + // create a eos in the receiving part. + let _ = sender.lock().unwrap().shutdown(std::net::Shutdown::Write); + } + }) + // Add a handler to the "new-sample" signal. + .new_sample(move |appsink| { + // Pull the sample in question out of the appsink's buffer. + let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer = sample.buffer().ok_or_else(|| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to get buffer from appsink") + ); + + gst::FlowError::Error + })?; + + if buffer.n_memory() != 1 { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Expected buffer with single memory") + ); + + return Err(gst::FlowError::Error); + } + + let mem = buffer.peek_memory(0); + + // We can use downcast_memory_ref to check if the provided + // memory is allocated by FdMemoryAllocator or a subtype of it. + // Note: This is not used in the example, we will always copy + // the memory to a new shared memory file. + if let Some(fd_memory) = mem.downcast_memory_ref::() { + // As we already got a fd we can just directly send it over the socket. + // NOTE: Synchronization is left out of this example, in a real world + // application access to the memory should be synchronized. + // For example wayland provides a release callback to signal that + // the memory is no longer in use. + sender + .lock() + .unwrap() + .send_fds(&[0u8; 1], &[fd_memory.fd()]) + .map_err(|_| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to send fd over unix stream") + ); + + gst::FlowError::Error + })?; + } else { + // At this point, buffer is only a reference to an existing memory region somewhere. + // When we want to access its content, we have to map it while requesting the required + // mode of access (read, read/write). + // This type of abstraction is necessary, because the buffer in question might not be + // on the machine's main memory itself, but rather in the GPU's memory. + // So mapping the buffer makes the underlying memory region accessible to us. + // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html + let map = buffer.map_readable().map_err(|_| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to map buffer readable") + ); + + gst::FlowError::Error + })?; + + // Note: To simplify this example we always create a new shared memory file instead + // of using a pool of buffers. When using a pool we need to make sure access to the + // file is synchronized. + let opts = memfd::MemfdOptions::default().allow_sealing(true); + let mfd = opts.create("gst-examples").map_err(|err| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to allocated fd: {}", err) + ); + + gst::FlowError::Error + })?; + + mfd.as_file().set_len(map.size() as u64).map_err(|err| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to resize fd memory: {}", err) + ); + + gst::FlowError::Error + })?; + + let mut seals = memfd::SealsHashSet::new(); + seals.insert(memfd::FileSeal::SealShrink); + seals.insert(memfd::FileSeal::SealGrow); + mfd.add_seals(&seals).map_err(|err| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to add fd seals: {}", err) + ); + + gst::FlowError::Error + })?; + + mfd.add_seal(memfd::FileSeal::SealSeal).map_err(|err| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to add fd seals: {}", err) + ); + + gst::FlowError::Error + })?; + + unsafe { + let mut mmap = MmapMut::map_mut(mfd.as_file()).map_err(|_| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to mmap fd") + ); + + gst::FlowError::Error + })?; + + mmap.copy_from_slice(map.as_slice()); + }; + + sender + .lock() + .unwrap() + .send_fds(&[0u8; 1], &[mfd.as_raw_fd()]) + .map_err(|_| { + element_error!( + appsink, + gst::ResourceError::Failed, + ("Failed to send fd over unix stream") + ); + + gst::FlowError::Error + })?; + }; + + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); + + Ok(pipeline) +} + +async fn message_loop(bus: gst::Bus) { + let mut messages = bus.stream(); + + while let Some(msg) = messages.next().await { + use gst::MessageView; + + // Determine whether we want to quit: on EOS or error message + // we quit, otherwise simply continue. + match msg.view() { + MessageView::Eos(..) => break, + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + break; + } + _ => (), + }; + } +} + +fn example_main() -> Result<(), Error> { + gst::init()?; + + let video_info = gst_video::VideoInfo::builder(gst_video::VideoFormat::Bgra, 1920, 1080) + .fps(gst::Fraction::new(30, 1)) + .build()?; + + let (sender, receiver) = std::os::unix::net::UnixStream::pair()?; + let sender_pipeline = create_sender_pipeline(&video_info, sender)?; + let receiver_pipeline = create_receiver_pipeline(&video_info, receiver)?; + + let receiver_bus = receiver_pipeline.bus().expect("pipeline without bus"); + receiver_pipeline.set_state(gst::State::Playing)?; + + let sender_bus = sender_pipeline.bus().expect("pipeline without bus"); + sender_pipeline.set_state(gst::State::Playing)?; + + futures::executor::block_on(futures::future::join( + message_loop(sender_bus), + message_loop(receiver_bus), + )); + + sender_pipeline.set_state(gst::State::Null)?; + receiver_pipeline.set_state(gst::State::Null)?; + + Ok(()) +} + +fn main() { + // tutorials_common::run is only required to set up the application environment on macOS + // (but not necessary in normal Cocoa applications where this is set up automatically) + match examples_common::run(example_main) { + Ok(r) => r, + Err(e) => eprintln!("Error! {}", e), + } +} + +// The purpose of this custom video filter is to demonstrate how +// the file descriptor of a FdMemory can be accessed. +mod video_filter { + use anyhow::Error; + + glib::wrapper! { + pub struct FdMemoryFadeInVideoFilter(ObjectSubclass) @extends gst_video::VideoFilter, gst_base::BaseTransform, gst::Element, gst::Object; + } + + impl FdMemoryFadeInVideoFilter { + pub fn new() -> Result { + Ok(glib::Object::builder().build()?) + } + } + mod imp { + use std::{cmp, mem::ManuallyDrop, os::unix::prelude::FromRawFd}; + + use anyhow::Error; + use glib::subclass::{object::ObjectImpl, types::ObjectSubclass}; + use gst::{ + subclass::prelude::{ElementImpl, GstObjectImpl}, + PadDirection, PadPresence, PadTemplate, + }; + use gst_app::gst_base::subclass::BaseTransformMode; + use gst_video::{ + subclass::prelude::{BaseTransformImpl, VideoFilterImpl}, + VideoFrameRef, + }; + use memmap2::MmapMut; + use once_cell::sync::Lazy; + + static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "fdmemoryfilter", + gst::DebugColorFlags::empty(), + Some("Example FdMemory filter"), + ) + }); + + #[derive(Debug, Default)] + pub struct FdMemoryFadeInVideoFilter; + + impl FdMemoryFadeInVideoFilter { + fn transform_fd_mem_ip( + &self, + frame: &mut VideoFrameRef<&mut gst::BufferRef>, + ) -> Result<(), Error> { + let buffer = frame.buffer(); + if buffer.n_memory() != 1 { + return Err(anyhow::anyhow!( + "only buffers with single memory are supported" + )); + } + let mem = buffer.peek_memory(0); + if !mem.is_memory_type::() { + return Err(anyhow::anyhow!("only fd memory is supported")); + } + + let timestamp = buffer.pts().unwrap(); + let factor = (timestamp.nseconds() as f64 + / (5 * gst::ClockTime::SECOND).nseconds() as f64) + .min(1.0f64); + + // If the fade-in has finished return early + if factor >= 1.0f64 { + return Ok(()); + } + + let fd = mem + .downcast_memory_ref::() + .unwrap() + .fd(); + + unsafe { + // We wrap the Memmfd in ManuallyDrop here because from_raw_fd takes ownership of + // the file descriptor which would close it on drop + // + // see: https://github.com/lucab/memfd-rs/issues/29 + let mfd = ManuallyDrop::new(memfd::Memfd::from_raw_fd(fd)); + let mut mmap = MmapMut::map_mut(mfd.as_file())?; + + for pixel in mmap.chunks_exact_mut(4) { + pixel[0] = cmp::max(0, cmp::min(255, (pixel[0] as f64 * factor) as u8)); + pixel[1] = cmp::max(0, cmp::min(255, (pixel[1] as f64 * factor) as u8)); + pixel[2] = cmp::max(0, cmp::min(255, (pixel[2] as f64 * factor) as u8)); + } + } + + Ok(()) + } + } + + impl ElementImpl for FdMemoryFadeInVideoFilter { + fn pad_templates() -> &'static [PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::builder("video/x-raw") + .field("format", "BGRA") + .build(); + vec![ + PadTemplate::new("sink", PadDirection::Sink, PadPresence::Always, &caps) + .unwrap(), + PadTemplate::new("src", PadDirection::Src, PadPresence::Always, &caps) + .unwrap(), + ] + }); + + PAD_TEMPLATES.as_ref() + } + } + + impl BaseTransformImpl for FdMemoryFadeInVideoFilter { + const MODE: BaseTransformMode = BaseTransformMode::AlwaysInPlace; + const PASSTHROUGH_ON_SAME_CAPS: bool = false; + const TRANSFORM_IP_ON_PASSTHROUGH: bool = true; + } + + impl VideoFilterImpl for FdMemoryFadeInVideoFilter { + fn transform_frame_ip( + &self, + element: &Self::Type, + frame: &mut VideoFrameRef<&mut gst::BufferRef>, + ) -> Result { + self.transform_fd_mem_ip(frame).map_err(|err| { + gst::error!(CAT, obj: element, "Failed to transform frame`: {}", err); + gst::FlowError::Error + })?; + + Ok(gst::FlowSuccess::Ok) + } + } + + impl ObjectImpl for FdMemoryFadeInVideoFilter {} + + impl GstObjectImpl for FdMemoryFadeInVideoFilter {} + + #[glib::object_subclass] + impl ObjectSubclass for FdMemoryFadeInVideoFilter { + const NAME: &'static str = "FdMemoryVideoFilter"; + type Type = super::FdMemoryFadeInVideoFilter; + type ParentType = gst_video::VideoFilter; + } + } +}