From 42d09c283484bc05906d8f13c44adf12371ca765 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 7 Apr 2020 12:56:22 +0300 Subject: [PATCH] gstreamer/buffer: Move BufferCursor/BufferRefCursor into its own module It's quite a bit of code and can be kept nicely separate. --- gstreamer/src/buffer.rs | 835 +------------------------------- gstreamer/src/buffer_cursor.rs | 846 +++++++++++++++++++++++++++++++++ gstreamer/src/lib.rs | 5 +- 3 files changed, 859 insertions(+), 827 deletions(-) create mode 100644 gstreamer/src/buffer_cursor.rs diff --git a/gstreamer/src/buffer.rs b/gstreamer/src/buffer.rs index fa9f9bf78..d78c3cc96 100644 --- a/gstreamer/src/buffer.rs +++ b/gstreamer/src/buffer.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2016-2017 Sebastian Dröge +// Copyright (C) 2016-2020 Sebastian Dröge // // Licensed under the Apache License, Version 2.0 or the MIT license @@ -7,7 +7,6 @@ // except according to those terms. use std::fmt; -use std::io; use std::marker::PhantomData; use std::mem; use std::ops; @@ -18,7 +17,9 @@ use std::usize; use meta::*; use miniobject::*; +use BufferCursor; use BufferFlags; +use BufferRefCursor; use ClockTime; use Memory; use MemoryRef; @@ -51,27 +52,6 @@ pub struct MappedBuffer { phantom: PhantomData, } -pub struct BufferCursor { - buffer: Option, - size: u64, - num_mem: u32, - cur_mem_idx: u32, - cur_offset: u64, - cur_mem_offset: usize, - map_info: gst_sys::GstMapInfo, - phantom: PhantomData, -} - -pub struct BufferCursorRef { - buffer: T, - size: u64, - num_mem: u32, - cur_mem_idx: u32, - cur_offset: u64, - cur_mem_offset: usize, - map_info: gst_sys::GstMapInfo, -} - impl Buffer { pub fn new() -> Self { assert_initialized_main_thread!(); @@ -181,11 +161,11 @@ impl Buffer { } } - pub fn into_buffer_cursor_readable(self) -> BufferCursor { + pub fn into_cursor_readable(self) -> BufferCursor { BufferCursor::new_readable(self) } - pub fn into_buffer_cursor_writable(self) -> Result, glib::BoolError> { + pub fn into_cursor_writable(self) -> Result, glib::BoolError> { BufferCursor::new_writable(self) } @@ -652,14 +632,14 @@ impl BufferRef { IterOwned::new(self) } - pub fn as_buffer_cursor_ref_readable<'a>(&'a self) -> BufferCursorRef<&'a BufferRef> { - BufferCursorRef::new_readable(self) + pub fn as_cursor_readable<'a>(&'a self) -> BufferRefCursor<&'a BufferRef> { + BufferRefCursor::new_readable(self) } - pub fn as_buffer_cursor_ref_writable<'a>( + pub fn as_cursor_writable<'a>( &'a mut self, - ) -> Result, glib::BoolError> { - BufferCursorRef::new_writable(self) + ) -> Result, glib::BoolError> { + BufferRefCursor::new_writable(self) } } @@ -1060,617 +1040,6 @@ impl Eq for MappedBuffer {} unsafe impl Send for MappedBuffer {} unsafe impl Sync for MappedBuffer {} -impl fmt::Debug for BufferCursor { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BufferCursor") - .field("buffer", &self.buffer) - .field("size", &self.size) - .field("num_mem", &self.num_mem) - .field("cur_mem_idx", &self.cur_mem_idx) - .field("cur_offset", &self.cur_offset) - .field("cur_mem_offset", &self.cur_mem_offset) - .field("map_info", &self.map_info) - .finish() - } -} - -impl Drop for BufferCursor { - fn drop(&mut self) { - if !self.map_info.memory.is_null() { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - } - } - } -} - -impl io::Read for BufferCursor { - fn read(&mut self, mut data: &mut [u8]) -> Result { - let mut copied = 0; - - while !data.is_empty() && self.cur_mem_idx < self.num_mem { - // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be - // set correctly here already (from constructor, seek and the bottom of the loop) - if self.map_info.memory.is_null() { - unsafe { - let memory = gst_sys::gst_buffer_peek_memory( - self.buffer.as_ref().unwrap().as_mut_ptr(), - self.cur_mem_idx, - ); - assert!(!memory.is_null()); - - if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_READ) - == glib_sys::GFALSE - { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Failed to map memory readable", - )); - } - } - - assert!(self.cur_mem_offset < self.map_info.size); - } - - assert!(!self.map_info.memory.is_null()); - - // Copy all data we can currently copy - let data_left = self.map_info.size - self.cur_mem_offset; - let to_copy = std::cmp::min(data.len(), data_left); - unsafe { - ptr::copy_nonoverlapping( - (self.map_info.data as *const u8).add(self.cur_mem_offset), - data.as_mut_ptr(), - to_copy, - ); - } - copied += to_copy; - self.cur_offset += to_copy as u64; - self.cur_mem_offset += to_copy; - data = &mut data[to_copy..]; - - // If we're at the end of the current memory, unmap and advance to the next memory - if self.cur_mem_offset == self.map_info.size { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - } - self.map_info.memory = ptr::null_mut(); - self.cur_mem_idx += 1; - self.cur_mem_offset = 0; - } - } - - Ok(copied) - } -} - -impl io::Write for BufferCursor { - fn write(&mut self, mut data: &[u8]) -> Result { - let mut copied = 0; - - while !data.is_empty() && self.cur_mem_idx < self.num_mem { - // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be - // set correctly here already (from constructor, seek and the bottom of the loop) - if self.map_info.memory.is_null() { - unsafe { - let memory = gst_sys::gst_buffer_peek_memory( - self.buffer.as_ref().unwrap().as_mut_ptr(), - self.cur_mem_idx, - ); - assert!(!memory.is_null()); - - if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_WRITE) - == glib_sys::GFALSE - { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Failed to map memory writable", - )); - } - } - - assert!(self.cur_mem_offset < self.map_info.size); - } - - assert!(!self.map_info.memory.is_null()); - - // Copy all data we can currently copy - let data_left = self.map_info.size - self.cur_mem_offset; - let to_copy = std::cmp::min(data.len(), data_left); - unsafe { - ptr::copy_nonoverlapping( - data.as_ptr(), - (self.map_info.data as *mut u8).add(self.cur_mem_offset), - to_copy, - ); - } - copied += to_copy; - self.cur_offset += to_copy as u64; - self.cur_mem_offset += to_copy; - data = &data[to_copy..]; - - // If we're at the end of the current memory, unmap and advance to the next memory - if self.cur_mem_offset == self.map_info.size { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - } - self.map_info.memory = ptr::null_mut(); - self.cur_mem_idx += 1; - self.cur_mem_offset = 0; - } - } - - Ok(copied) - } - - fn flush(&mut self) -> Result<(), io::Error> { - Ok(()) - } -} - -impl io::Seek for BufferCursor { - fn seek(&mut self, pos: io::SeekFrom) -> Result { - if !self.map_info.memory.is_null() { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - self.map_info.memory = ptr::null_mut(); - } - } - - match pos { - io::SeekFrom::Start(off) => { - self.cur_offset = std::cmp::min(self.size, off); - } - io::SeekFrom::End(off) if off <= 0 => { - self.cur_offset = self.size; - } - io::SeekFrom::End(off) => { - self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") - })?; - } - io::SeekFrom::Current(std::i64::MIN) => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Seek before start of buffer", - )); - } - io::SeekFrom::Current(off) => { - if off <= 0 { - self.cur_offset = - self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "Seek before start of buffer", - ) - })?; - } else { - self.cur_offset = std::cmp::min( - self.size, - self.cur_offset.checked_add(off as u64).unwrap_or(self.size), - ); - } - } - } - - let (idx, _, skip) = self - .buffer - .as_ref() - .unwrap() - .find_memory(self.cur_offset as usize, None) - .expect("Failed to find memory"); - self.cur_mem_idx = idx; - self.cur_mem_offset = skip; - - Ok(self.cur_offset) - } - - // Once stabilized - // fn stream_len(&mut self) -> Result { - // Ok(self.size) - // } - // - // fn stream_position(&mut self) -> Result { - // Ok(self.current_offset) - // } -} - -impl BufferCursor { - pub fn stream_len(&mut self) -> Result { - Ok(self.size) - } - - pub fn stream_position(&mut self) -> Result { - Ok(self.cur_offset) - } - - pub fn get_buffer(&self) -> &BufferRef { - self.buffer.as_ref().unwrap().as_ref() - } - - pub fn into_buffer(mut self) -> Buffer { - self.buffer.take().unwrap() - } -} - -impl BufferCursor { - fn new_readable(buffer: Buffer) -> BufferCursor { - let size = buffer.get_size() as u64; - let num_mem = buffer.n_memory(); - - BufferCursor { - buffer: Some(buffer), - size, - num_mem, - cur_mem_idx: 0, - cur_offset: 0, - cur_mem_offset: 0, - map_info: unsafe { mem::zeroed() }, - phantom: PhantomData, - } - } -} - -impl BufferCursor { - fn new_writable(buffer: Buffer) -> Result, glib::BoolError> { - if !buffer.is_writable() || !buffer.is_all_memory_writable() { - return Err(glib_bool_error!("Not all memories are writable")); - } - - let size = buffer.get_size() as u64; - let num_mem = buffer.n_memory(); - - Ok(BufferCursor { - buffer: Some(buffer), - size, - num_mem, - cur_mem_idx: 0, - cur_offset: 0, - cur_mem_offset: 0, - map_info: unsafe { mem::zeroed() }, - phantom: PhantomData, - }) - } -} - -unsafe impl Send for BufferCursor {} -unsafe impl Sync for BufferCursor {} - -impl fmt::Debug for BufferCursorRef { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BufferCursorRef") - .field("buffer", &self.buffer) - .field("size", &self.size) - .field("num_mem", &self.num_mem) - .field("cur_mem_idx", &self.cur_mem_idx) - .field("cur_offset", &self.cur_offset) - .field("cur_mem_offset", &self.cur_mem_offset) - .field("map_info", &self.map_info) - .finish() - } -} - -impl Drop for BufferCursorRef { - fn drop(&mut self) { - if !self.map_info.memory.is_null() { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - } - } - } -} - -impl<'a> io::Read for BufferCursorRef<&'a BufferRef> { - fn read(&mut self, mut data: &mut [u8]) -> Result { - let mut copied = 0; - - while !data.is_empty() && self.cur_mem_idx < self.num_mem { - // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be - // set correctly here already (from constructor, seek and the bottom of the loop) - if self.map_info.memory.is_null() { - unsafe { - let memory = - gst_sys::gst_buffer_peek_memory(self.buffer.as_mut_ptr(), self.cur_mem_idx); - assert!(!memory.is_null()); - - if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_READ) - == glib_sys::GFALSE - { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Failed to map memory readable", - )); - } - } - - assert!(self.cur_mem_offset < self.map_info.size); - } - - assert!(!self.map_info.memory.is_null()); - - // Copy all data we can currently copy - let data_left = self.map_info.size - self.cur_mem_offset; - let to_copy = std::cmp::min(data.len(), data_left); - unsafe { - ptr::copy_nonoverlapping( - (self.map_info.data as *const u8).add(self.cur_mem_offset), - data.as_mut_ptr(), - to_copy, - ); - } - copied += to_copy; - self.cur_offset += to_copy as u64; - self.cur_mem_offset += to_copy; - data = &mut data[to_copy..]; - - // If we're at the end of the current memory, unmap and advance to the next memory - if self.cur_mem_offset == self.map_info.size { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - } - self.map_info.memory = ptr::null_mut(); - self.cur_mem_idx += 1; - self.cur_mem_offset = 0; - } - } - - Ok(copied) - } -} - -impl<'a> io::Write for BufferCursorRef<&'a mut BufferRef> { - fn write(&mut self, mut data: &[u8]) -> Result { - let mut copied = 0; - - while !data.is_empty() && self.cur_mem_idx < self.num_mem { - // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be - // set correctly here already (from constructor, seek and the bottom of the loop) - if self.map_info.memory.is_null() { - unsafe { - let memory = - gst_sys::gst_buffer_peek_memory(self.buffer.as_mut_ptr(), self.cur_mem_idx); - assert!(!memory.is_null()); - - if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_WRITE) - == glib_sys::GFALSE - { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Failed to map memory writable", - )); - } - } - - assert!(self.cur_mem_offset < self.map_info.size); - } - - assert!(!self.map_info.memory.is_null()); - - // Copy all data we can currently copy - let data_left = self.map_info.size - self.cur_mem_offset; - let to_copy = std::cmp::min(data.len(), data_left); - unsafe { - ptr::copy_nonoverlapping( - data.as_ptr(), - (self.map_info.data as *mut u8).add(self.cur_mem_offset), - to_copy, - ); - } - copied += to_copy; - self.cur_offset += to_copy as u64; - self.cur_mem_offset += to_copy; - data = &data[to_copy..]; - - // If we're at the end of the current memory, unmap and advance to the next memory - if self.cur_mem_offset == self.map_info.size { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - } - self.map_info.memory = ptr::null_mut(); - self.cur_mem_idx += 1; - self.cur_mem_offset = 0; - } - } - - Ok(copied) - } - - fn flush(&mut self) -> Result<(), io::Error> { - Ok(()) - } -} - -impl<'a> io::Seek for BufferCursorRef<&'a BufferRef> { - fn seek(&mut self, pos: io::SeekFrom) -> Result { - if !self.map_info.memory.is_null() { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - self.map_info.memory = ptr::null_mut(); - } - } - - match pos { - io::SeekFrom::Start(off) => { - self.cur_offset = std::cmp::min(self.size, off); - } - io::SeekFrom::End(off) if off <= 0 => { - self.cur_offset = self.size; - } - io::SeekFrom::End(off) => { - self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") - })?; - } - io::SeekFrom::Current(std::i64::MIN) => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Seek before start of buffer", - )); - } - io::SeekFrom::Current(off) => { - if off <= 0 { - self.cur_offset = - self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "Seek before start of buffer", - ) - })?; - } else { - self.cur_offset = std::cmp::min( - self.size, - self.cur_offset.checked_add(off as u64).unwrap_or(self.size), - ); - } - } - } - - let (idx, _, skip) = self - .buffer - .find_memory(self.cur_offset as usize, None) - .expect("Failed to find memory"); - self.cur_mem_idx = idx; - self.cur_mem_offset = skip; - - Ok(self.cur_offset) - } - - // Once stabilized - // fn stream_len(&mut self) -> Result { - // Ok(self.size) - // } - // - // fn stream_position(&mut self) -> Result { - // Ok(self.current_offset) - // } -} - -impl<'a> io::Seek for BufferCursorRef<&'a mut BufferRef> { - fn seek(&mut self, pos: io::SeekFrom) -> Result { - if !self.map_info.memory.is_null() { - unsafe { - gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); - self.map_info.memory = ptr::null_mut(); - } - } - - match pos { - io::SeekFrom::Start(off) => { - self.cur_offset = std::cmp::min(self.size, off); - } - io::SeekFrom::End(off) if off <= 0 => { - self.cur_offset = self.size; - } - io::SeekFrom::End(off) => { - self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") - })?; - } - io::SeekFrom::Current(std::i64::MIN) => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Seek before start of buffer", - )); - } - io::SeekFrom::Current(off) => { - if off <= 0 { - self.cur_offset = - self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "Seek before start of buffer", - ) - })?; - } else { - self.cur_offset = std::cmp::min( - self.size, - self.cur_offset.checked_add(off as u64).unwrap_or(self.size), - ); - } - } - } - - let (idx, _, skip) = self - .buffer - .find_memory(self.cur_offset as usize, None) - .expect("Failed to find memory"); - self.cur_mem_idx = idx; - self.cur_mem_offset = skip; - - Ok(self.cur_offset) - } - - // Once stabilized - // fn stream_len(&mut self) -> Result { - // Ok(self.size) - // } - // - // fn stream_position(&mut self) -> Result { - // Ok(self.current_offset) - // } -} - -impl BufferCursorRef { - pub fn stream_len(&mut self) -> Result { - Ok(self.size) - } - - pub fn stream_position(&mut self) -> Result { - Ok(self.cur_offset) - } -} - -impl<'a> BufferCursorRef<&'a BufferRef> { - pub fn get_buffer(&self) -> &BufferRef { - self.buffer - } - - fn new_readable(buffer: &'a BufferRef) -> BufferCursorRef<&'a BufferRef> { - let size = buffer.get_size() as u64; - let num_mem = buffer.n_memory(); - - BufferCursorRef { - buffer, - size, - num_mem, - cur_mem_idx: 0, - cur_offset: 0, - cur_mem_offset: 0, - map_info: unsafe { mem::zeroed() }, - } - } -} - -impl<'a> BufferCursorRef<&'a mut BufferRef> { - pub fn get_buffer(&self) -> &BufferRef { - self.buffer - } - - fn new_writable( - buffer: &'a mut BufferRef, - ) -> Result, glib::BoolError> { - if !buffer.is_all_memory_writable() { - return Err(glib_bool_error!("Not all memories are writable")); - } - - let size = buffer.get_size() as u64; - let num_mem = buffer.n_memory(); - - Ok(BufferCursorRef { - buffer, - size, - num_mem, - cur_mem_idx: 0, - cur_offset: 0, - cur_mem_offset: 0, - map_info: unsafe { mem::zeroed() }, - }) - } -} - -unsafe impl Send for BufferCursorRef {} -unsafe impl Sync for BufferCursorRef {} - pub const BUFFER_COPY_METADATA: ::BufferCopyFlags = ::BufferCopyFlags::from_bits_truncate(gst_sys::GST_BUFFER_COPY_METADATA); pub const BUFFER_COPY_ALL: ::BufferCopyFlags = @@ -1856,188 +1225,4 @@ mod tests { assert_eq!(last, 4); } - - #[test] - fn test_buffer_cursor() { - use std::io::{self, Read, Seek, Write}; - - ::init().unwrap(); - - let mut buffer = Buffer::new(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 10])); - } - - assert!(buffer.is_all_memory_writable()); - assert_eq!(buffer.n_memory(), 5); - assert_eq!(buffer.get_size(), 30); - - let mut cursor = buffer.into_buffer_cursor_writable().unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 0); - cursor.write_all(b"01234567").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 8); - cursor.write_all(b"890123").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 14); - cursor.write_all(b"456").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 17); - cursor.write_all(b"78901234567").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 28); - cursor.write_all(b"89").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 30); - assert!(cursor.write_all(b"0").is_err()); - - assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); - assert_eq!(cursor.stream_position().unwrap(), 5); - cursor.write_all(b"A").unwrap(); - - assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.write_all(b"B").unwrap(); - - assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.write_all(b"C").unwrap(); - - assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); - assert_eq!(cursor.stream_position().unwrap(), 27); - cursor.write_all(b"D").unwrap(); - - let buffer = cursor.into_buffer(); - - let mut cursor = buffer.into_buffer_cursor_readable(); - let mut data = [0; 30]; - - assert_eq!(cursor.stream_position().unwrap(), 0); - cursor.read_exact(&mut data[0..7]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 7); - assert_eq!(&data[0..7], b"01234A6"); - cursor.read_exact(&mut data[0..5]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 12); - assert_eq!(&data[0..5], b"78901"); - cursor.read_exact(&mut data[0..10]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 22); - assert_eq!(&data[0..10], b"2345678901"); - cursor.read_exact(&mut data[0..8]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 30); - assert_eq!(&data[0..8], b"234C6D89"); - assert!(cursor.read_exact(&mut data[0..1]).is_err()); - - assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); - assert_eq!(cursor.stream_position().unwrap(), 5); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"A"); - - assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"C"); - - assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"C"); - - assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); - assert_eq!(cursor.stream_position().unwrap(), 27); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"D"); - } - - #[test] - fn test_buffer_cursor_ref() { - use std::io::{self, Read, Seek, Write}; - - ::init().unwrap(); - - let mut buffer = Buffer::new(); - { - let buffer = buffer.get_mut().unwrap(); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); - buffer.append_memory(::Memory::from_mut_slice(vec![0; 10])); - } - - assert!(buffer.is_all_memory_writable()); - assert_eq!(buffer.n_memory(), 5); - assert_eq!(buffer.get_size(), 30); - - { - let buffer = buffer.get_mut().unwrap(); - - let mut cursor = buffer.as_buffer_cursor_ref_writable().unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 0); - cursor.write_all(b"01234567").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 8); - cursor.write_all(b"890123").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 14); - cursor.write_all(b"456").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 17); - cursor.write_all(b"78901234567").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 28); - cursor.write_all(b"89").unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 30); - assert!(cursor.write_all(b"0").is_err()); - - assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); - assert_eq!(cursor.stream_position().unwrap(), 5); - cursor.write_all(b"A").unwrap(); - - assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.write_all(b"B").unwrap(); - - assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.write_all(b"C").unwrap(); - - assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); - assert_eq!(cursor.stream_position().unwrap(), 27); - cursor.write_all(b"D").unwrap(); - } - - let mut cursor = buffer.as_buffer_cursor_ref_readable(); - let mut data = [0; 30]; - - assert_eq!(cursor.stream_position().unwrap(), 0); - cursor.read_exact(&mut data[0..7]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 7); - assert_eq!(&data[0..7], b"01234A6"); - cursor.read_exact(&mut data[0..5]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 12); - assert_eq!(&data[0..5], b"78901"); - cursor.read_exact(&mut data[0..10]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 22); - assert_eq!(&data[0..10], b"2345678901"); - cursor.read_exact(&mut data[0..8]).unwrap(); - assert_eq!(cursor.stream_position().unwrap(), 30); - assert_eq!(&data[0..8], b"234C6D89"); - assert!(cursor.read_exact(&mut data[0..1]).is_err()); - - assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); - assert_eq!(cursor.stream_position().unwrap(), 5); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"A"); - - assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"C"); - - assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); - assert_eq!(cursor.stream_position().unwrap(), 25); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"C"); - - assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); - assert_eq!(cursor.stream_position().unwrap(), 27); - cursor.read_exact(&mut data[0..1]).unwrap(); - assert_eq!(&data[0..1], b"D"); - } } diff --git a/gstreamer/src/buffer_cursor.rs b/gstreamer/src/buffer_cursor.rs new file mode 100644 index 000000000..e4072beb3 --- /dev/null +++ b/gstreamer/src/buffer_cursor.rs @@ -0,0 +1,846 @@ +// Copyright (C) 2020 Sebastian Dröge +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::u64; +use std::usize; + +use miniobject::*; +use Buffer; +use BufferRef; + +use glib; +use glib_sys; +use gst_sys; + +use buffer::{Readable, Writable}; + +pub struct BufferCursor { + buffer: Option, + size: u64, + num_mem: u32, + cur_mem_idx: u32, + cur_offset: u64, + cur_mem_offset: usize, + map_info: gst_sys::GstMapInfo, + phantom: PhantomData, +} + +pub struct BufferRefCursor { + buffer: T, + size: u64, + num_mem: u32, + cur_mem_idx: u32, + cur_offset: u64, + cur_mem_offset: usize, + map_info: gst_sys::GstMapInfo, +} + +impl fmt::Debug for BufferCursor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BufferCursor") + .field("buffer", &self.buffer) + .field("size", &self.size) + .field("num_mem", &self.num_mem) + .field("cur_mem_idx", &self.cur_mem_idx) + .field("cur_offset", &self.cur_offset) + .field("cur_mem_offset", &self.cur_mem_offset) + .field("map_info", &self.map_info) + .finish() + } +} + +impl Drop for BufferCursor { + fn drop(&mut self) { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + } + } +} + +impl io::Read for BufferCursor { + fn read(&mut self, mut data: &mut [u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = gst_sys::gst_buffer_peek_memory( + self.buffer.as_ref().unwrap().as_mut_ptr(), + self.cur_mem_idx, + ); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_READ) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory readable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + (self.map_info.data as *const u8).add(self.cur_mem_offset), + data.as_mut_ptr(), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &mut data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } +} + +impl io::Write for BufferCursor { + fn write(&mut self, mut data: &[u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = gst_sys::gst_buffer_peek_memory( + self.buffer.as_ref().unwrap().as_mut_ptr(), + self.cur_mem_idx, + ); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_WRITE) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory writable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + data.as_ptr(), + (self.map_info.data as *mut u8).add(self.cur_mem_offset), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } + + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +impl io::Seek for BufferCursor { + fn seek(&mut self, pos: io::SeekFrom) -> Result { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + self.map_info.memory = ptr::null_mut(); + } + } + + match pos { + io::SeekFrom::Start(off) => { + self.cur_offset = std::cmp::min(self.size, off); + } + io::SeekFrom::End(off) if off <= 0 => { + self.cur_offset = self.size; + } + io::SeekFrom::End(off) => { + self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") + })?; + } + io::SeekFrom::Current(std::i64::MIN) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + )); + } + io::SeekFrom::Current(off) => { + if off <= 0 { + self.cur_offset = + self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + ) + })?; + } else { + self.cur_offset = std::cmp::min( + self.size, + self.cur_offset.checked_add(off as u64).unwrap_or(self.size), + ); + } + } + } + + let (idx, _, skip) = self + .buffer + .as_ref() + .unwrap() + .find_memory(self.cur_offset as usize, None) + .expect("Failed to find memory"); + self.cur_mem_idx = idx; + self.cur_mem_offset = skip; + + Ok(self.cur_offset) + } + + // Once stabilized + // fn stream_len(&mut self) -> Result { + // Ok(self.size) + // } + // + // fn stream_position(&mut self) -> Result { + // Ok(self.current_offset) + // } +} + +impl BufferCursor { + pub fn stream_len(&mut self) -> Result { + Ok(self.size) + } + + pub fn stream_position(&mut self) -> Result { + Ok(self.cur_offset) + } + + pub fn get_buffer(&self) -> &BufferRef { + self.buffer.as_ref().unwrap().as_ref() + } + + pub fn into_buffer(mut self) -> Buffer { + self.buffer.take().unwrap() + } +} + +impl BufferCursor { + pub(crate) fn new_readable(buffer: Buffer) -> BufferCursor { + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + BufferCursor { + buffer: Some(buffer), + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + phantom: PhantomData, + } + } +} + +impl BufferCursor { + pub(crate) fn new_writable(buffer: Buffer) -> Result, glib::BoolError> { + if !buffer.is_writable() || !buffer.is_all_memory_writable() { + return Err(glib_bool_error!("Not all memories are writable")); + } + + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + Ok(BufferCursor { + buffer: Some(buffer), + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + phantom: PhantomData, + }) + } +} + +unsafe impl Send for BufferCursor {} +unsafe impl Sync for BufferCursor {} + +impl fmt::Debug for BufferRefCursor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BufferRefCursor") + .field("buffer", &self.buffer) + .field("size", &self.size) + .field("num_mem", &self.num_mem) + .field("cur_mem_idx", &self.cur_mem_idx) + .field("cur_offset", &self.cur_offset) + .field("cur_mem_offset", &self.cur_mem_offset) + .field("map_info", &self.map_info) + .finish() + } +} + +impl Drop for BufferRefCursor { + fn drop(&mut self) { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + } + } +} + +impl<'a> io::Read for BufferRefCursor<&'a BufferRef> { + fn read(&mut self, mut data: &mut [u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = + gst_sys::gst_buffer_peek_memory(self.buffer.as_mut_ptr(), self.cur_mem_idx); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_READ) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory readable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + (self.map_info.data as *const u8).add(self.cur_mem_offset), + data.as_mut_ptr(), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &mut data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } +} + +impl<'a> io::Write for BufferRefCursor<&'a mut BufferRef> { + fn write(&mut self, mut data: &[u8]) -> Result { + let mut copied = 0; + + while !data.is_empty() && self.cur_mem_idx < self.num_mem { + // Map memory if needed. cur_mem_idx, cur_mem_offset and cur_offset are required to be + // set correctly here already (from constructor, seek and the bottom of the loop) + if self.map_info.memory.is_null() { + unsafe { + let memory = + gst_sys::gst_buffer_peek_memory(self.buffer.as_mut_ptr(), self.cur_mem_idx); + assert!(!memory.is_null()); + + if gst_sys::gst_memory_map(memory, &mut self.map_info, gst_sys::GST_MAP_WRITE) + == glib_sys::GFALSE + { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to map memory writable", + )); + } + } + + assert!(self.cur_mem_offset < self.map_info.size); + } + + assert!(!self.map_info.memory.is_null()); + + // Copy all data we can currently copy + let data_left = self.map_info.size - self.cur_mem_offset; + let to_copy = std::cmp::min(data.len(), data_left); + unsafe { + ptr::copy_nonoverlapping( + data.as_ptr(), + (self.map_info.data as *mut u8).add(self.cur_mem_offset), + to_copy, + ); + } + copied += to_copy; + self.cur_offset += to_copy as u64; + self.cur_mem_offset += to_copy; + data = &data[to_copy..]; + + // If we're at the end of the current memory, unmap and advance to the next memory + if self.cur_mem_offset == self.map_info.size { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + } + self.map_info.memory = ptr::null_mut(); + self.cur_mem_idx += 1; + self.cur_mem_offset = 0; + } + } + + Ok(copied) + } + + fn flush(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +impl<'a> io::Seek for BufferRefCursor<&'a BufferRef> { + fn seek(&mut self, pos: io::SeekFrom) -> Result { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + self.map_info.memory = ptr::null_mut(); + } + } + + match pos { + io::SeekFrom::Start(off) => { + self.cur_offset = std::cmp::min(self.size, off); + } + io::SeekFrom::End(off) if off <= 0 => { + self.cur_offset = self.size; + } + io::SeekFrom::End(off) => { + self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") + })?; + } + io::SeekFrom::Current(std::i64::MIN) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + )); + } + io::SeekFrom::Current(off) => { + if off <= 0 { + self.cur_offset = + self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + ) + })?; + } else { + self.cur_offset = std::cmp::min( + self.size, + self.cur_offset.checked_add(off as u64).unwrap_or(self.size), + ); + } + } + } + + let (idx, _, skip) = self + .buffer + .find_memory(self.cur_offset as usize, None) + .expect("Failed to find memory"); + self.cur_mem_idx = idx; + self.cur_mem_offset = skip; + + Ok(self.cur_offset) + } + + // Once stabilized + // fn stream_len(&mut self) -> Result { + // Ok(self.size) + // } + // + // fn stream_position(&mut self) -> Result { + // Ok(self.current_offset) + // } +} + +impl<'a> io::Seek for BufferRefCursor<&'a mut BufferRef> { + fn seek(&mut self, pos: io::SeekFrom) -> Result { + if !self.map_info.memory.is_null() { + unsafe { + gst_sys::gst_memory_unmap(self.map_info.memory, &mut self.map_info); + self.map_info.memory = ptr::null_mut(); + } + } + + match pos { + io::SeekFrom::Start(off) => { + self.cur_offset = std::cmp::min(self.size, off); + } + io::SeekFrom::End(off) if off <= 0 => { + self.cur_offset = self.size; + } + io::SeekFrom::End(off) => { + self.cur_offset = self.size.checked_sub(off as u64).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "Seek before start of buffer") + })?; + } + io::SeekFrom::Current(std::i64::MIN) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + )); + } + io::SeekFrom::Current(off) => { + if off <= 0 { + self.cur_offset = + self.cur_offset.checked_sub((-off) as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Seek before start of buffer", + ) + })?; + } else { + self.cur_offset = std::cmp::min( + self.size, + self.cur_offset.checked_add(off as u64).unwrap_or(self.size), + ); + } + } + } + + let (idx, _, skip) = self + .buffer + .find_memory(self.cur_offset as usize, None) + .expect("Failed to find memory"); + self.cur_mem_idx = idx; + self.cur_mem_offset = skip; + + Ok(self.cur_offset) + } + + // Once stabilized + // fn stream_len(&mut self) -> Result { + // Ok(self.size) + // } + // + // fn stream_position(&mut self) -> Result { + // Ok(self.current_offset) + // } +} + +impl BufferRefCursor { + pub fn stream_len(&mut self) -> Result { + Ok(self.size) + } + + pub fn stream_position(&mut self) -> Result { + Ok(self.cur_offset) + } +} + +impl<'a> BufferRefCursor<&'a BufferRef> { + pub fn get_buffer(&self) -> &BufferRef { + self.buffer + } + + pub(crate) fn new_readable(buffer: &'a BufferRef) -> BufferRefCursor<&'a BufferRef> { + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + BufferRefCursor { + buffer, + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + } + } +} + +impl<'a> BufferRefCursor<&'a mut BufferRef> { + pub fn get_buffer(&self) -> &BufferRef { + self.buffer + } + + pub(crate) fn new_writable( + buffer: &'a mut BufferRef, + ) -> Result, glib::BoolError> { + if !buffer.is_all_memory_writable() { + return Err(glib_bool_error!("Not all memories are writable")); + } + + let size = buffer.get_size() as u64; + let num_mem = buffer.n_memory(); + + Ok(BufferRefCursor { + buffer, + size, + num_mem, + cur_mem_idx: 0, + cur_offset: 0, + cur_mem_offset: 0, + map_info: unsafe { mem::zeroed() }, + }) + } +} + +unsafe impl Send for BufferRefCursor {} +unsafe impl Sync for BufferRefCursor {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_buffer_cursor() { + use std::io::{self, Read, Seek, Write}; + + ::init().unwrap(); + + let mut buffer = Buffer::new(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 10])); + } + + assert!(buffer.is_all_memory_writable()); + assert_eq!(buffer.n_memory(), 5); + assert_eq!(buffer.get_size(), 30); + + let mut cursor = buffer.into_cursor_writable().unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 0); + cursor.write_all(b"01234567").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 8); + cursor.write_all(b"890123").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 14); + cursor.write_all(b"456").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 17); + cursor.write_all(b"78901234567").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 28); + cursor.write_all(b"89").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 30); + assert!(cursor.write_all(b"0").is_err()); + + assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); + assert_eq!(cursor.stream_position().unwrap(), 5); + cursor.write_all(b"A").unwrap(); + + assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.write_all(b"B").unwrap(); + + assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.write_all(b"C").unwrap(); + + assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); + assert_eq!(cursor.stream_position().unwrap(), 27); + cursor.write_all(b"D").unwrap(); + + let buffer = cursor.into_buffer(); + + let mut cursor = buffer.into_cursor_readable(); + let mut data = [0; 30]; + + assert_eq!(cursor.stream_position().unwrap(), 0); + cursor.read_exact(&mut data[0..7]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 7); + assert_eq!(&data[0..7], b"01234A6"); + cursor.read_exact(&mut data[0..5]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 12); + assert_eq!(&data[0..5], b"78901"); + cursor.read_exact(&mut data[0..10]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 22); + assert_eq!(&data[0..10], b"2345678901"); + cursor.read_exact(&mut data[0..8]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 30); + assert_eq!(&data[0..8], b"234C6D89"); + assert!(cursor.read_exact(&mut data[0..1]).is_err()); + + assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); + assert_eq!(cursor.stream_position().unwrap(), 5); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"A"); + + assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"C"); + + assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"C"); + + assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); + assert_eq!(cursor.stream_position().unwrap(), 27); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"D"); + } + + #[test] + fn test_buffer_cursor_ref() { + use std::io::{self, Read, Seek, Write}; + + ::init().unwrap(); + + let mut buffer = Buffer::new(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 5])); + buffer.append_memory(::Memory::from_mut_slice(vec![0; 10])); + } + + assert!(buffer.is_all_memory_writable()); + assert_eq!(buffer.n_memory(), 5); + assert_eq!(buffer.get_size(), 30); + + { + let buffer = buffer.get_mut().unwrap(); + + let mut cursor = buffer.as_cursor_writable().unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 0); + cursor.write_all(b"01234567").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 8); + cursor.write_all(b"890123").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 14); + cursor.write_all(b"456").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 17); + cursor.write_all(b"78901234567").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 28); + cursor.write_all(b"89").unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 30); + assert!(cursor.write_all(b"0").is_err()); + + assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); + assert_eq!(cursor.stream_position().unwrap(), 5); + cursor.write_all(b"A").unwrap(); + + assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.write_all(b"B").unwrap(); + + assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.write_all(b"C").unwrap(); + + assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); + assert_eq!(cursor.stream_position().unwrap(), 27); + cursor.write_all(b"D").unwrap(); + } + + let mut cursor = buffer.as_cursor_readable(); + let mut data = [0; 30]; + + assert_eq!(cursor.stream_position().unwrap(), 0); + cursor.read_exact(&mut data[0..7]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 7); + assert_eq!(&data[0..7], b"01234A6"); + cursor.read_exact(&mut data[0..5]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 12); + assert_eq!(&data[0..5], b"78901"); + cursor.read_exact(&mut data[0..10]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 22); + assert_eq!(&data[0..10], b"2345678901"); + cursor.read_exact(&mut data[0..8]).unwrap(); + assert_eq!(cursor.stream_position().unwrap(), 30); + assert_eq!(&data[0..8], b"234C6D89"); + assert!(cursor.read_exact(&mut data[0..1]).is_err()); + + assert_eq!(cursor.seek(io::SeekFrom::Start(5)).unwrap(), 5); + assert_eq!(cursor.stream_position().unwrap(), 5); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"A"); + + assert_eq!(cursor.seek(io::SeekFrom::End(5)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"C"); + + assert_eq!(cursor.seek(io::SeekFrom::Current(-1)).unwrap(), 25); + assert_eq!(cursor.stream_position().unwrap(), 25); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"C"); + + assert_eq!(cursor.seek(io::SeekFrom::Current(1)).unwrap(), 27); + assert_eq!(cursor.stream_position().unwrap(), 27); + cursor.read_exact(&mut data[0..1]).unwrap(); + assert_eq!(&data[0..1], b"D"); + } +} diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 307b17a5c..c61028850 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -126,9 +126,10 @@ pub use meta::ReferenceTimestampMeta; pub use meta::{Meta, MetaAPI, MetaRef, MetaRefMut, ParentBufferMeta}; pub mod buffer; pub use buffer::{ - Buffer, BufferCursor, BufferCursorRef, BufferMap, BufferRef, MappedBuffer, BUFFER_COPY_ALL, - BUFFER_COPY_METADATA, + Buffer, BufferMap, BufferRef, MappedBuffer, BUFFER_COPY_ALL, BUFFER_COPY_METADATA, }; +mod buffer_cursor; +pub use buffer_cursor::{BufferCursor, BufferRefCursor}; pub mod memory; pub use memory::{MappedMemory, Memory, MemoryMap, MemoryRef}; #[cfg(feature = "ser_de")]