From a884a419db0e52a862269e0a4010f5605f23a613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 6 Apr 2020 19:29:44 +0300 Subject: [PATCH] gstreamer/buffer: Add a BufferCursor / BufferCursorRef helper structs This implements Read/Seek or Write/Seek and allows to read/write/seek into the buffer without merging the memories inside. The writer also only maps the memory write-only as compared to all other ways of accessing the buffer/memory data in a writable way, which have to map it read-write. See https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/425 for a similar API proposal for GStreamer core. --- gstreamer/src/buffer.rs | 651 ++++++++++++++++++++++++++++++++++++++++ gstreamer/src/lib.rs | 2 +- 2 files changed, 652 insertions(+), 1 deletion(-) diff --git a/gstreamer/src/buffer.rs b/gstreamer/src/buffer.rs index 9cb134104..ae59041e0 100644 --- a/gstreamer/src/buffer.rs +++ b/gstreamer/src/buffer.rs @@ -7,6 +7,7 @@ // except according to those terms. use std::fmt; +use std::io; use std::marker::PhantomData; use std::mem; use std::ops; @@ -50,6 +51,27 @@ pub struct MappedBuffer { phantom: PhantomData, } +pub struct BufferCursor { + buffer: Option, + size: u64, + num_mem: u32, + cur_mem_idx: u32, + cur_offset: u64, + cur_mem_offset: usize, + map_info: gst_sys::GstMapInfo, + phantom: PhantomData, +} + +pub struct BufferCursorRef { + buffer: T, + size: u64, + num_mem: u32, + cur_mem_idx: u32, + cur_offset: u64, + cur_mem_offset: usize, + map_info: gst_sys::GstMapInfo, +} + impl Buffer { pub fn new() -> Self { assert_initialized_main_thread!(); @@ -159,6 +181,14 @@ impl Buffer { } } + pub fn into_buffer_cursor_readable(self) -> BufferCursor { + BufferCursor::new_readable(self) + } + + pub fn into_buffer_cursor_writable(self) -> Result, glib::BoolError> { + BufferCursor::new_writable(self) + } + pub fn append(&mut self, other: Self) { skip_assert_initialized!(); unsafe { @@ -621,6 +651,16 @@ impl BufferRef { pub fn iter_memories_owned(&self) -> IterOwned { IterOwned::new(self) } + + pub fn as_buffer_cursor_ref_readable<'a>(&'a self) -> BufferCursorRef<&'a BufferRef> { + BufferCursorRef::new_readable(self) + } + + pub fn as_buffer_cursor_ref_writable<'a>( + &'a mut self, + ) -> Result, glib::BoolError> { + BufferCursorRef::new_writable(self) + } } macro_rules! define_meta_iter( @@ -1020,6 +1060,617 @@ impl Eq for MappedBuffer {} unsafe impl Send for MappedBuffer {} unsafe impl Sync for MappedBuffer {} +impl fmt::Debug for BufferCursor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BufferCursor") + .field("buffer", &self.buffer) + .field("size", &self.size) + .field("num_mem", &self.num_mem) + .field("cur_mem_idx", &self.cur_mem_idx) + .field("cur_offset", &self.cur_offset) + .field("cur_mem_offset", &self.cur_mem_offset) + .field("map_info", &self.map_info) + .finish() + } +} + +impl Drop for BufferCursor { + fn drop(&mut self) { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + } + } +} + +impl io::Read for BufferCursor { + fn read(&mut self, mut data: &mut [u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = gst_sys::gst_buffer_peek_memory( + self.buffer.as_ref().unwrap().as_mut_ptr(), + self.cur_mem_idx, + ); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_READ) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory readable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + (self.map_info.data as *const u8).add(self.cur_mem_offset), + data.as_mut_ptr(), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &mut data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } +} + +impl io::Write for BufferCursor { + fn write(&mut self, mut data: &[u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = gst_sys::gst_buffer_peek_memory( + self.buffer.as_ref().unwrap().as_mut_ptr(), + self.cur_mem_idx, + ); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_WRITE) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory writable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + data.as_ptr(), + (self.map_info.data as *mut u8).add(self.cur_mem_offset), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } + + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +impl io::Seek for BufferCursor { + fn seek(&mut self, pos: io::SeekFrom) -> Result { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + self.map_info.memory = ptr::null_mut(); + } + } + + match pos { + io::SeekFrom::Start(off) => { + self.cur_offset = std::cmp::min(self.size, off); + } + io::SeekFrom::End(off) if off <= 0 => { + self.cur_offset = self.size; + } + io::SeekFrom::End(off) => { + self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") + })?; + } + io::SeekFrom::Current(std::i64::MIN) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + )); + } + io::SeekFrom::Current(off) => { + if off <= 0 { + self.cur_offset = + self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + ) + })?; + } else { + self.cur_offset = std::cmp::min( + self.size, + self.cur_offset.checked_add(off as u64).unwrap_or(self.size), + ); + } + } + } + + let (idx, _, skip) = self + .buffer + .as_ref() + .unwrap() + .find_memory(self.cur_offset as usize, None) + .expect("Failed to find memory"); + self.cur_mem_idx = idx; + self.cur_mem_offset = skip; + + Ok(self.cur_offset) + } + + // Once stabilized + // fn stream_len(&mut self) -> Result { + // Ok(self.size) + // } + // + // fn stream_position(&mut self) -> Result { + // Ok(self.current_offset) + // } +} + +impl BufferCursor { + pub fn stream_len(&mut self) -> Result { + Ok(self.size) + } + + pub fn stream_position(&mut self) -> Result { + Ok(self.cur_offset) + } + + pub fn get_buffer(&self) -> &BufferRef { + self.buffer.as_ref().unwrap().as_ref() + } + + pub fn into_buffer(mut self) -> Buffer { + self.buffer.take().unwrap() + } +} + +impl BufferCursor { + fn new_readable(buffer: Buffer) -> BufferCursor { + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + BufferCursor { + buffer: Some(buffer), + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + phantom: PhantomData, + } + } +} + +impl BufferCursor { + fn new_writable(buffer: Buffer) -> Result, glib::BoolError> { + if !buffer.is_writable() || !buffer.is_all_memory_writable() { + return Err(glib_bool_error!("Not all memories are writable")); + } + + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + Ok(BufferCursor { + buffer: Some(buffer), + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + phantom: PhantomData, + }) + } +} + +unsafe impl Send for BufferCursor {} +unsafe impl Sync for BufferCursor {} + +impl fmt::Debug for BufferCursorRef { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BufferCursorRef") + .field("buffer", &self.buffer) + .field("size", &self.size) + .field("num_mem", &self.num_mem) + .field("cur_mem_idx", &self.cur_mem_idx) + .field("cur_offset", &self.cur_offset) + .field("cur_mem_offset", &self.cur_mem_offset) + .field("map_info", &self.map_info) + .finish() + } +} + +impl Drop for BufferCursorRef { + fn drop(&mut self) { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + } + } +} + +impl<'a> io::Read for BufferCursorRef<&'a BufferRef> { + fn read(&mut self, mut data: &mut [u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = + gst_sys::gst_buffer_peek_memory(self.buffer.as_mut_ptr(), self.cur_mem_idx); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_READ) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory readable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + (self.map_info.data as *const u8).add(self.cur_mem_offset), + data.as_mut_ptr(), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &mut data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } +} + +impl<'a> io::Write for BufferCursorRef<&'a mut BufferRef> { + fn write(&mut self, mut data: &[u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = + gst_sys::gst_buffer_peek_memory(self.buffer.as_mut_ptr(), self.cur_mem_idx); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_WRITE) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory writable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + data.as_ptr(), + (self.map_info.data as *mut u8).add(self.cur_mem_offset), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } + + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +impl<'a> io::Seek for BufferCursorRef<&'a BufferRef> { + fn seek(&mut self, pos: io::SeekFrom) -> Result { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + self.map_info.memory = ptr::null_mut(); + } + } + + match pos { + io::SeekFrom::Start(off) => { + self.cur_offset = std::cmp::min(self.size, off); + } + io::SeekFrom::End(off) if off <= 0 => { + self.cur_offset = self.size; + } + io::SeekFrom::End(off) => { + self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") + })?; + } + io::SeekFrom::Current(std::i64::MIN) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + )); + } + io::SeekFrom::Current(off) => { + if off <= 0 { + self.cur_offset = + self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + ) + })?; + } else { + self.cur_offset = std::cmp::min( + self.size, + self.cur_offset.checked_add(off as u64).unwrap_or(self.size), + ); + } + } + } + + let (idx, _, skip) = self + .buffer + .find_memory(self.cur_offset as usize, None) + .expect("Failed to find memory"); + self.cur_mem_idx = idx; + self.cur_mem_offset = skip; + + Ok(self.cur_offset) + } + + // Once stabilized + // fn stream_len(&mut self) -> Result { + // Ok(self.size) + // } + // + // fn stream_position(&mut self) -> Result { + // Ok(self.current_offset) + // } +} + +impl<'a> io::Seek for BufferCursorRef<&'a mut BufferRef> { + fn seek(&mut self, pos: io::SeekFrom) -> Result { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + self.map_info.memory = ptr::null_mut(); + } + } + + match pos { + io::SeekFrom::Start(off) => { + self.cur_offset = std::cmp::min(self.size, off); + } + io::SeekFrom::End(off) if off <= 0 => { + self.cur_offset = self.size; + } + io::SeekFrom::End(off) => { + self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") + })?; + } + io::SeekFrom::Current(std::i64::MIN) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + )); + } + io::SeekFrom::Current(off) => { + if off <= 0 { + self.cur_offset = + self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + ) + })?; + } else { + self.cur_offset = std::cmp::min( + self.size, + self.cur_offset.checked_add(off as u64).unwrap_or(self.size), + ); + } + } + } + + let (idx, _, skip) = self + .buffer + .find_memory(self.cur_offset as usize, None) + .expect("Failed to find memory"); + self.cur_mem_idx = idx; + self.cur_mem_offset = skip; + + Ok(self.cur_offset) + } + + // Once stabilized + // fn stream_len(&mut self) -> Result { + // Ok(self.size) + // } + // + // fn stream_position(&mut self) -> Result { + // Ok(self.current_offset) + // } +} + +impl BufferCursorRef { + pub fn stream_len(&mut self) -> Result { + Ok(self.size) + } + + pub fn stream_position(&mut self) -> Result { + Ok(self.cur_offset) + } +} + +impl<'a> BufferCursorRef<&'a BufferRef> { + pub fn get_buffer(&self) -> &BufferRef { + self.buffer + } + + fn new_readable(buffer: &'a BufferRef) -> BufferCursorRef<&'a BufferRef> { + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + BufferCursorRef { + buffer, + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + } + } +} + +impl<'a> BufferCursorRef<&'a mut BufferRef> { + pub fn get_buffer(&self) -> &BufferRef { + self.buffer + } + + fn new_writable( + buffer: &'a mut BufferRef, + ) -> Result, glib::BoolError> { + if !buffer.is_all_memory_writable() { + return Err(glib_bool_error!("Not all memories are writable")); + } + + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + Ok(BufferCursorRef { + buffer, + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + }) + } +} + +unsafe impl Send for BufferCursorRef {} +unsafe impl Sync for BufferCursorRef {} + pub const BUFFER_COPY_METADATA: ::BufferCopyFlags = ::BufferCopyFlags::from_bits_truncate(gst_sys::GST_BUFFER_COPY_METADATA); pub const BUFFER_COPY_ALL: ::BufferCopyFlags = diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 7ed4d60c9..dcf7ff7ca 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -126,7 +126,7 @@ pub use meta::ReferenceTimestampMeta; pub use meta::{Meta, MetaAPI, MetaRef, MetaRefMut, ParentBufferMeta}; pub mod buffer; pub use buffer::{ - Buffer, BufferMap, BufferRef, MappedBuffer, BUFFER_COPY_ALL, BUFFER_COPY_METADATA, + Buffer, BufferCursor, BufferMap, BufferRef, MappedBuffer, BUFFER_COPY_ALL, BUFFER_COPY_METADATA, }; pub mod memory; pub use memory::{MappedMemory, Memory, MemoryMap, MemoryRef};