Extend examples to showcase the use of the FdMemoryAllocator

This commit is contained in:
Christian Meissl 2022-03-01 20:19:50 +01:00 committed by Sebastian Dröge
parent a8dc789ef8
commit 163130cad7
2 changed files with 532 additions and 5 deletions

View file

@ -23,6 +23,7 @@ ges = { package = "gstreamer-editing-services", path = "../gstreamer-editing-ser
gst-sdp = { package = "gstreamer-sdp", path = "../gstreamer-sdp", optional = true }
gst-rtsp = { package = "gstreamer-rtsp", path = "../gstreamer-rtsp", optional = true }
gst-rtsp-server = { package = "gstreamer-rtsp-server", path = "../gstreamer-rtsp-server", optional = true }
gst-allocators = { package = "gstreamer-allocators", path = "../gstreamer-allocators", optional = true }
gtk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true }
gdk = { git = "https://github.com/gtk-rs/gtk3-rs", optional = true }
gio = { git = "https://github.com/gtk-rs/gtk-rs-core", optional = true }
@ -36,6 +37,9 @@ pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core" , optional = true }
glutin = { version = "0.28", optional = true }
once_cell = "1.0"
image = { version = "0.24", optional = true }
memmap2 = { version = "0.5", optional = true }
memfd = { version = "0.4", optional = true }
uds = { version = "0.2", optional = true }
[target.'cfg(target_os = "macos")'.dependencies]
cocoa = "0.24"
@ -56,6 +60,7 @@ v1_14 = ["gst/v1_14"]
pango-cairo = ["pango", "pangocairo", "cairo-rs"]
overlay-composition = ["pango", "pangocairo", "cairo-rs"]
gl = ["gst-gl", "gl_generator", "glutin"]
allocators = ["gst-allocators", "memmap2", "memfd", "uds"]
[[bin]]
name = "appsink"
@ -178,3 +183,7 @@ name = "video_converter"
[[bin]]
name = "thumbnail"
required-features = ["image"]
[[bin]]
name = "fd_allocator"
required-features = ["allocators"]

View file

@ -0,0 +1,518 @@
// This example demonstrates the use of the FdMemory allocator.
// It operates the following two pipelines:
// sender: {videotestsrc} - {appsink}
// receiver: {appsrc} - {FdMemoryVideoFilter} - {videoconvert} - {queue} - {autovideosink}
// The sender creates shared memory files from the appsink which are sent
// to the receiver using a unix domain socket.
// The receiver creates buffers in the appsrc using the FdMemoryAllocator from
// the received file descriptors.
// Additional to demonstrating how the FdMemoryAllocator can be used to share
// file descriptors the example implements a custom VideoFilter demonstrating
// how the file descriptor of FdMemory can be accessed in a pipeline.
// Note that instead of manual mapping the file descriptor it is also possible
// to use map_writable, which will also map the file descriptor.
use futures::StreamExt;
use gst::{element_error, prelude::*};
use anyhow::Error;
use derive_more::{Display, Error};
use memmap2::MmapMut;
use uds::UnixStreamExt;
use std::{
os::unix::{net::UnixStream, prelude::AsRawFd},
sync::{Arc, Mutex},
};
#[path = "../examples-common.rs"]
mod examples_common;
#[derive(Debug, Display, Error)]
#[display(fmt = "Missing element {}", _0)]
struct MissingElement(#[error(not(source))] &'static str);
#[derive(Debug, Display, Error)]
#[display(fmt = "Received error from {}: {} (debug: {:?})", src, error, debug)]
struct ErrorMessage {
src: String,
error: String,
debug: Option<String>,
source: glib::Error,
}
fn create_receiver_pipeline(
video_info: &gst_video::VideoInfo,
receiver: UnixStream,
) -> Result<gst::Pipeline, Error> {
let caps = video_info.to_caps()?;
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("appsrc", None).map_err(|_| MissingElement("appsrc"))?;
let filter = video_filter::FdMemoryFadeInVideoFilter::new()?.upcast::<gst::Element>();
let convert = gst::ElementFactory::make("videoconvert", None)
.map_err(|_| MissingElement("videoconvert"))?;
let queue = gst::ElementFactory::make("queue", None).map_err(|_| MissingElement("queue"))?;
let sink = gst::ElementFactory::make("autovideosink", None)
.map_err(|_| MissingElement("autovideosink"))?;
src.downcast_ref::<gst_app::AppSrc>()
.ok_or_else(|| anyhow::anyhow!("is not a appsrc"))?
.set_caps(Some(&caps));
pipeline.add_many(&[&src, &filter, &convert, &queue, &sink])?;
gst::Element::link_many(&[&src, &filter, &convert, &queue, &sink])?;
let appsrc = src
.downcast::<gst_app::AppSrc>()
.map_err(|_| anyhow::anyhow!("is not a appsrc"))?;
appsrc.set_do_timestamp(true);
appsrc.set_is_live(true);
let fd_allocator = gst_allocators::FdAllocator::new();
let video_info = video_info.clone();
let mut fd_buf = [-1; 253];
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
// Read the next fds from the socket, if 0
// is returned the sender has closed the stream
// which is handled as EOS here.
let fds = match receiver.recv_fds(&mut [0u8; 1], &mut fd_buf) {
Ok((_, 0)) => {
let _ = appsrc.end_of_stream();
return;
}
Ok((_, fds)) => fds,
Err(err) => {
gst::error_msg!(
gst::StreamError::Failed,
("failed to receive fds: {}", err)
);
return;
}
};
for fd in &fd_buf[0..fds] {
// Allocate a new FdMemory for the received file descriptor.
// It is important that the size matches the size of the
// actual backing storage. In this example we just use the
// same video info in both sides, sending and receiving.
// Pass FdMemoryFlags::NONE to make the FdMemory take
// ownership of the passed file descriptor. The file descriptor
// will be closed when the memory is released.
let memory = unsafe {
fd_allocator.alloc(
*fd,
video_info.size(),
gst_allocators::FdMemoryFlags::NONE,
)
};
let mut buffer = gst::Buffer::new();
let buffer_mut = buffer.make_mut();
buffer_mut.append_memory(memory);
let _ = appsrc.push_buffer(buffer);
}
})
.build(),
);
Ok(pipeline)
}
fn create_sender_pipeline(
video_info: &gst_video::VideoInfo,
sender: UnixStream,
) -> Result<gst::Pipeline, Error> {
let sender = Arc::new(Mutex::new(sender));
let caps = video_info.to_caps()?;
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("videotestsrc", None)
.map_err(|_| MissingElement("videotestsrc"))?;
let sink = gst::ElementFactory::make("appsink", None).map_err(|_| MissingElement("appsink"))?;
src.set_property("num-buffers", 250i32);
sink.downcast_ref::<gst_app::AppSink>()
.ok_or_else(|| anyhow::anyhow!("is not a appsink"))?
.set_caps(Some(&caps));
pipeline.add_many(&[&src, &sink])?;
gst::Element::link_many(&[&src, &sink])?;
let appsink = sink
.downcast::<gst_app::AppSink>()
.map_err(|_| anyhow::anyhow!("is not a appsink"))?;
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
// Add a handler to the "eos" signal
.eos({
let sender = sender.clone();
move |_| {
// Close the sender part of the UnixSocket pair, this will automatically
// create a eos in the receiving part.
let _ = sender.lock().unwrap().shutdown(std::net::Shutdown::Write);
}
})
// Add a handler to the "new-sample" signal.
.new_sample(move |appsink| {
// Pull the sample in question out of the appsink's buffer.
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or_else(|| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to get buffer from appsink")
);
gst::FlowError::Error
})?;
if buffer.n_memory() != 1 {
element_error!(
appsink,
gst::ResourceError::Failed,
("Expected buffer with single memory")
);
return Err(gst::FlowError::Error);
}
let mem = buffer.peek_memory(0);
// We can use downcast_memory_ref to check if the provided
// memory is allocated by FdMemoryAllocator or a subtype of it.
// Note: This is not used in the example, we will always copy
// the memory to a new shared memory file.
if let Some(fd_memory) = mem.downcast_memory_ref::<gst_allocators::FdMemory>() {
// As we already got a fd we can just directly send it over the socket.
// NOTE: Synchronization is left out of this example, in a real world
// application access to the memory should be synchronized.
// For example wayland provides a release callback to signal that
// the memory is no longer in use.
sender
.lock()
.unwrap()
.send_fds(&[0u8; 1], &[fd_memory.fd()])
.map_err(|_| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to send fd over unix stream")
);
gst::FlowError::Error
})?;
} else {
// At this point, buffer is only a reference to an existing memory region somewhere.
// When we want to access its content, we have to map it while requesting the required
// mode of access (read, read/write).
// This type of abstraction is necessary, because the buffer in question might not be
// on the machine's main memory itself, but rather in the GPU's memory.
// So mapping the buffer makes the underlying memory region accessible to us.
// See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html
let map = buffer.map_readable().map_err(|_| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to map buffer readable")
);
gst::FlowError::Error
})?;
// Note: To simplify this example we always create a new shared memory file instead
// of using a pool of buffers. When using a pool we need to make sure access to the
// file is synchronized.
let opts = memfd::MemfdOptions::default().allow_sealing(true);
let mfd = opts.create("gst-examples").map_err(|err| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to allocated fd: {}", err)
);
gst::FlowError::Error
})?;
mfd.as_file().set_len(map.size() as u64).map_err(|err| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to resize fd memory: {}", err)
);
gst::FlowError::Error
})?;
let mut seals = memfd::SealsHashSet::new();
seals.insert(memfd::FileSeal::SealShrink);
seals.insert(memfd::FileSeal::SealGrow);
mfd.add_seals(&seals).map_err(|err| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to add fd seals: {}", err)
);
gst::FlowError::Error
})?;
mfd.add_seal(memfd::FileSeal::SealSeal).map_err(|err| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to add fd seals: {}", err)
);
gst::FlowError::Error
})?;
unsafe {
let mut mmap = MmapMut::map_mut(mfd.as_file()).map_err(|_| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to mmap fd")
);
gst::FlowError::Error
})?;
mmap.copy_from_slice(map.as_slice());
};
sender
.lock()
.unwrap()
.send_fds(&[0u8; 1], &[mfd.as_raw_fd()])
.map_err(|_| {
element_error!(
appsink,
gst::ResourceError::Failed,
("Failed to send fd over unix stream")
);
gst::FlowError::Error
})?;
};
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
Ok(pipeline)
}
async fn message_loop(bus: gst::Bus) {
let mut messages = bus.stream();
while let Some(msg) = messages.next().await {
use gst::MessageView;
// Determine whether we want to quit: on EOS or error message
// we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
break;
}
_ => (),
};
}
}
fn example_main() -> Result<(), Error> {
gst::init()?;
let video_info = gst_video::VideoInfo::builder(gst_video::VideoFormat::Bgra, 1920, 1080)
.fps(gst::Fraction::new(30, 1))
.build()?;
let (sender, receiver) = std::os::unix::net::UnixStream::pair()?;
let sender_pipeline = create_sender_pipeline(&video_info, sender)?;
let receiver_pipeline = create_receiver_pipeline(&video_info, receiver)?;
let receiver_bus = receiver_pipeline.bus().expect("pipeline without bus");
receiver_pipeline.set_state(gst::State::Playing)?;
let sender_bus = sender_pipeline.bus().expect("pipeline without bus");
sender_pipeline.set_state(gst::State::Playing)?;
futures::executor::block_on(futures::future::join(
message_loop(sender_bus),
message_loop(receiver_bus),
));
sender_pipeline.set_state(gst::State::Null)?;
receiver_pipeline.set_state(gst::State::Null)?;
Ok(())
}
fn main() {
// tutorials_common::run is only required to set up the application environment on macOS
// (but not necessary in normal Cocoa applications where this is set up automatically)
match examples_common::run(example_main) {
Ok(r) => r,
Err(e) => eprintln!("Error! {}", e),
}
}
// The purpose of this custom video filter is to demonstrate how
// the file descriptor of a FdMemory can be accessed.
mod video_filter {
use anyhow::Error;
glib::wrapper! {
pub struct FdMemoryFadeInVideoFilter(ObjectSubclass<imp::FdMemoryFadeInVideoFilter>) @extends gst_video::VideoFilter, gst_base::BaseTransform, gst::Element, gst::Object;
}
impl FdMemoryFadeInVideoFilter {
pub fn new() -> Result<Self, Error> {
Ok(glib::Object::builder().build()?)
}
}
mod imp {
use std::{cmp, mem::ManuallyDrop, os::unix::prelude::FromRawFd};
use anyhow::Error;
use glib::subclass::{object::ObjectImpl, types::ObjectSubclass};
use gst::{
subclass::prelude::{ElementImpl, GstObjectImpl},
PadDirection, PadPresence, PadTemplate,
};
use gst_app::gst_base::subclass::BaseTransformMode;
use gst_video::{
subclass::prelude::{BaseTransformImpl, VideoFilterImpl},
VideoFrameRef,
};
use memmap2::MmapMut;
use once_cell::sync::Lazy;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"fdmemoryfilter",
gst::DebugColorFlags::empty(),
Some("Example FdMemory filter"),
)
});
#[derive(Debug, Default)]
pub struct FdMemoryFadeInVideoFilter;
impl FdMemoryFadeInVideoFilter {
fn transform_fd_mem_ip(
&self,
frame: &mut VideoFrameRef<&mut gst::BufferRef>,
) -> Result<(), Error> {
let buffer = frame.buffer();
if buffer.n_memory() != 1 {
return Err(anyhow::anyhow!(
"only buffers with single memory are supported"
));
}
let mem = buffer.peek_memory(0);
if !mem.is_memory_type::<gst_allocators::FdMemory>() {
return Err(anyhow::anyhow!("only fd memory is supported"));
}
let timestamp = buffer.pts().unwrap();
let factor = (timestamp.nseconds() as f64
/ (5 * gst::ClockTime::SECOND).nseconds() as f64)
.min(1.0f64);
// If the fade-in has finished return early
if factor >= 1.0f64 {
return Ok(());
}
let fd = mem
.downcast_memory_ref::<gst_allocators::FdMemory>()
.unwrap()
.fd();
unsafe {
// We wrap the Memmfd in ManuallyDrop here because from_raw_fd takes ownership of
// the file descriptor which would close it on drop
//
// see: https://github.com/lucab/memfd-rs/issues/29
let mfd = ManuallyDrop::new(memfd::Memfd::from_raw_fd(fd));
let mut mmap = MmapMut::map_mut(mfd.as_file())?;
for pixel in mmap.chunks_exact_mut(4) {
pixel[0] = cmp::max(0, cmp::min(255, (pixel[0] as f64 * factor) as u8));
pixel[1] = cmp::max(0, cmp::min(255, (pixel[1] as f64 * factor) as u8));
pixel[2] = cmp::max(0, cmp::min(255, (pixel[2] as f64 * factor) as u8));
}
}
Ok(())
}
}
impl ElementImpl for FdMemoryFadeInVideoFilter {
fn pad_templates() -> &'static [PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("video/x-raw")
.field("format", "BGRA")
.build();
vec![
PadTemplate::new("sink", PadDirection::Sink, PadPresence::Always, &caps)
.unwrap(),
PadTemplate::new("src", PadDirection::Src, PadPresence::Always, &caps)
.unwrap(),
]
});
PAD_TEMPLATES.as_ref()
}
}
impl BaseTransformImpl for FdMemoryFadeInVideoFilter {
const MODE: BaseTransformMode = BaseTransformMode::AlwaysInPlace;
const PASSTHROUGH_ON_SAME_CAPS: bool = false;
const TRANSFORM_IP_ON_PASSTHROUGH: bool = true;
}
impl VideoFilterImpl for FdMemoryFadeInVideoFilter {
fn transform_frame_ip(
&self,
element: &Self::Type,
frame: &mut VideoFrameRef<&mut gst::BufferRef>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.transform_fd_mem_ip(frame).map_err(|err| {
gst::error!(CAT, obj: element, "Failed to transform frame`: {}", err);
gst::FlowError::Error
})?;
Ok(gst::FlowSuccess::Ok)
}
}
impl ObjectImpl for FdMemoryFadeInVideoFilter {}
impl GstObjectImpl for FdMemoryFadeInVideoFilter {}
#[glib::object_subclass]
impl ObjectSubclass for FdMemoryFadeInVideoFilter {
const NAME: &'static str = "FdMemoryVideoFilter";
type Type = super::FdMemoryFadeInVideoFilter;
type ParentType = gst_video::VideoFilter;
}
}
}