WIP demuxer

This commit is contained in:
Sebastian Dröge 2016-11-24 16:29:43 +02:00
parent ed2bcf875a
commit 62d24efc04
10 changed files with 1583 additions and 3 deletions

View file

@ -11,6 +11,8 @@ libc = "0.2"
url = "1.1" url = "1.1"
bitflags = "0.7" bitflags = "0.7"
reqwest = "0.2" reqwest = "0.2"
nom = "1.2"
flavors = {git = "https://github.com/Geal/flavors.git"}
[build-dependencies] [build-dependencies]
gcc = "0.3" gcc = "0.3"

View file

@ -23,7 +23,7 @@ fn main() {
let gstbase = pkg_config::probe_library("gstreamer-base-1.0").unwrap(); let gstbase = pkg_config::probe_library("gstreamer-base-1.0").unwrap();
let includes = [gstreamer.include_paths, gstbase.include_paths]; let includes = [gstreamer.include_paths, gstbase.include_paths];
let files = ["src/plugin.c", "src/rssource.c", "src/rssink.c"]; let files = ["src/plugin.c", "src/rssource.c", "src/rssink.c", "src/rsdemuxer.c"];
let mut config = gcc::Config::new(); let mut config = gcc::Config::new();
config.include("src"); config.include("src");

View file

@ -81,6 +81,21 @@ impl Adapter {
assert_eq!(left, 0); assert_eq!(left, 0);
} }
pub fn peek_into(&self, data: &mut [u8]) -> Result<(), AdapterError> {
let size = data.len();
if self.size < size {
return Err(AdapterError::NotEnoughData);
}
if size == 0 {
return Ok(());
}
Self::copy_data(&self.deque, self.skip, data, size);
Ok(())
}
pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> { pub fn peek(&mut self, size: usize) -> Result<&[u8], AdapterError> {
if self.size < size { if self.size < size {
return Err(AdapterError::NotEnoughData); return Err(AdapterError::NotEnoughData);

View file

@ -241,6 +241,19 @@ impl Buffer {
} }
} }
pub unsafe fn into_ptr(mut self) -> *mut c_void {
extern "C" {
fn gst_mini_object_ref(obj: *mut c_void) -> *mut c_void;
};
if !self.owned {
gst_mini_object_ref(self.raw);
}
self.owned = false;
self.raw
}
pub fn is_writable(&self) -> bool { pub fn is_writable(&self) -> bool {
extern "C" { extern "C" {
fn gst_mini_object_is_writable(obj: *const c_void) -> GBoolean; fn gst_mini_object_is_writable(obj: *const c_void) -> GBoolean;

463
src/flvdemux.rs Normal file
View file

@ -0,0 +1,463 @@
// Copyright (C) 2016 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
// Boston, MA 02110-1301, USA.
use std::cmp;
use nom;
use nom::IResult;
use flavors::parser as flavors;
use error::*;
use rsdemuxer::*;
use buffer::*;
use adapter::*;
#[derive(Debug)]
enum StreamingState {
Stopped,
Started {
adapter: Adapter,
stream_state: StreamState,
},
}
#[derive(Debug)]
enum StreamState {
NeedHeader,
Initialized {
header: flavors::Header,
initialized_state: InitializedState,
},
}
#[derive(Debug)]
enum InitializedState {
Skipping { skip_left: u32 },
Setup { setup_state: SetupState },
}
#[derive(Debug)]
struct SetupState {
audio: Option<flavors::AudioDataHeader>,
video: Option<flavors::VideoDataHeader>, /* TODO: parse and store various audio/video metadata from ScriptDataObject */
got_all_streams: bool,
last_position: Option<u64>,
}
#[derive(Debug)]
pub struct FlvDemux {
streaming_state: StreamingState,
}
impl FlvDemux {
pub fn new() -> FlvDemux {
FlvDemux { streaming_state: StreamingState::Stopped }
}
pub fn new_boxed() -> Box<Demuxer> {
Box::new(FlvDemux::new())
}
fn handle_script_tag(adapter: &mut Adapter,
header: &flavors::Header,
setup_state: &mut SetupState,
tag_header: &flavors::TagHeader)
-> Result<HandleBufferResult, FlowError> {
adapter.flush((15 + tag_header.data_size) as usize).unwrap();
Ok(HandleBufferResult::Again)
}
fn update_audio_stream(header: &flavors::Header,
setup_state: &mut SetupState,
data_header: flavors::AudioDataHeader)
-> Result<HandleBufferResult, FlowError> {
let stream_changed = match setup_state.audio {
None => true,
Some(flavors::AudioDataHeader { sound_format, sound_rate, sound_size, sound_type })
if sound_format != data_header.sound_format ||
sound_rate != data_header.sound_rate ||
sound_size != data_header.sound_size ||
sound_type != data_header.sound_type => true,
_ => false,
};
if stream_changed {
match data_header {
flavors::AudioDataHeader { sound_format: flavors::SoundFormat::MP3, .. } => {
let format = String::from("audio/mpeg, mpegversion=1, layer=3");
let new_stream = setup_state.audio == None;
setup_state.audio = Some(data_header);
let stream = Stream::new(0, format, String::from("audio"));
if new_stream {
return Ok(HandleBufferResult::StreamAdded(stream));
} else {
return Ok(HandleBufferResult::StreamChanged(stream));
}
}
_ => {
unimplemented!();
}
}
}
setup_state.audio = Some(data_header);
if !setup_state.got_all_streams &&
(header.video && setup_state.video != None || !header.video) {
setup_state.got_all_streams = true;
return Ok(HandleBufferResult::HaveAllStreams);
}
Ok(HandleBufferResult::Again)
}
fn handle_audio_tag(adapter: &mut Adapter,
header: &flavors::Header,
setup_state: &mut SetupState,
tag_header: &flavors::TagHeader,
data_header: flavors::AudioDataHeader)
-> Result<HandleBufferResult, FlowError> {
let res = Self::update_audio_stream(header, setup_state, data_header);
match res {
Ok(HandleBufferResult::Again) => (),
_ => return res,
}
if adapter.get_available() < (15 + tag_header.data_size) as usize {
return Ok(HandleBufferResult::NeedMoreData);
}
adapter.flush(16).unwrap();
if tag_header.data_size == 0 {
return Ok(HandleBufferResult::Again);
}
let mut buffer = adapter.get_buffer((tag_header.data_size - 1) as usize)
.unwrap();
buffer.set_pts(Some((tag_header.timestamp as u64) * 1000 * 1000))
.unwrap();
Ok(HandleBufferResult::BufferForStream(0, buffer))
}
fn update_video_stream(header: &flavors::Header,
setup_state: &mut SetupState,
data_header: flavors::VideoDataHeader)
-> Result<HandleBufferResult, FlowError> {
let stream_changed = match setup_state.video {
None => true,
Some(flavors::VideoDataHeader { codec_id, .. }) if codec_id != data_header.codec_id => {
true
}
_ => false,
};
if stream_changed {
match data_header {
flavors::VideoDataHeader { codec_id: flavors::CodecId::VP6, .. } => {
let format = String::from("video/x-vp6-flash");
let new_stream = setup_state.video == None;
setup_state.video = Some(data_header);
let stream = Stream::new(1, format, String::from("video"));
if new_stream {
return Ok(HandleBufferResult::StreamAdded(stream));
} else {
return Ok(HandleBufferResult::StreamChanged(stream));
}
}
_ => {
unimplemented!();
}
}
}
setup_state.video = Some(data_header);
if !setup_state.got_all_streams &&
(header.audio && setup_state.audio != None || !header.audio) {
setup_state.got_all_streams = true;
return Ok(HandleBufferResult::HaveAllStreams);
}
Ok(HandleBufferResult::Again)
}
fn handle_video_tag(adapter: &mut Adapter,
header: &flavors::Header,
setup_state: &mut SetupState,
tag_header: &flavors::TagHeader,
data_header: flavors::VideoDataHeader)
-> Result<HandleBufferResult, FlowError> {
let res = Self::update_video_stream(header, setup_state, data_header);
match res {
Ok(HandleBufferResult::Again) => (),
_ => return res,
}
if adapter.get_available() < (15 + tag_header.data_size) as usize {
return Ok(HandleBufferResult::NeedMoreData);
}
let video = setup_state.video.as_ref().unwrap();
let is_keyframe = video.frame_type == flavors::FrameType::Key;
adapter.flush(16).unwrap();
let offset = if video.codec_id == flavors::CodecId::VP6 ||
video.codec_id == flavors::CodecId::VP6A {
1
} else {
0
};
if tag_header.data_size == 0 {
return Ok(HandleBufferResult::Again);
}
if tag_header.data_size < offset {
adapter.flush((tag_header.data_size - 1) as usize).unwrap();
return Ok(HandleBufferResult::Again);
}
if offset > 0 {
adapter.flush(offset as usize).unwrap();
}
let mut buffer = adapter.get_buffer((tag_header.data_size - 1 - offset) as usize)
.unwrap();
if !is_keyframe {
buffer.set_flags(BUFFER_FLAG_DELTA_UNIT).unwrap();
}
buffer.set_dts(Some((tag_header.timestamp as u64) * 1000 * 1000))
.unwrap();
Ok(HandleBufferResult::BufferForStream(1, buffer))
}
fn update_setup_state(adapter: &mut Adapter,
header: &flavors::Header,
setup_state: &mut SetupState)
-> Result<HandleBufferResult, FlowError> {
if adapter.get_available() < 16 {
return Ok(HandleBufferResult::NeedMoreData);
}
let mut data = [0u8; 16];
adapter.peek_into(&mut data).unwrap();
match nom::be_u32(&data[0..4]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, previous_size) => {
();
}
}
let tag_header = match flavors::tag_header(&data[4..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, tag_header) => tag_header,
};
let res = match tag_header.tag_type {
flavors::TagType::Script => {
Self::handle_script_tag(adapter, header, setup_state, &tag_header)
}
flavors::TagType::Audio => {
let data_header = match flavors::audio_data_header(&data[15..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, data_header) => data_header,
};
Self::handle_audio_tag(adapter, header, setup_state, &tag_header, data_header)
}
flavors::TagType::Video => {
let data_header = match flavors::video_data_header(&data[15..]) {
IResult::Error(_) |
IResult::Incomplete(_) => {
unimplemented!();
}
IResult::Done(_, data_header) => data_header,
};
Self::handle_video_tag(adapter, header, setup_state, &tag_header, data_header)
}
};
if let Ok(HandleBufferResult::BufferForStream(_, ref buffer)) = res {
if let Some(pts) = buffer.get_pts() {
setup_state.last_position =
setup_state.last_position.map(|last| cmp::max(last, pts)).or_else(|| Some(pts));
} else if let Some(dts) = buffer.get_dts() {
setup_state.last_position =
setup_state.last_position.map(|last| cmp::max(last, dts)).or_else(|| Some(dts));
}
}
res
}
fn update_initialized_state(adapter: &mut Adapter,
header: &flavors::Header,
initialized_state: &mut InitializedState)
-> Result<HandleBufferResult, FlowError> {
match *initialized_state {
InitializedState::Skipping { skip_left: 0 } => {
*initialized_state = InitializedState::Setup {
setup_state: SetupState {
audio: None,
video: None,
got_all_streams: false,
last_position: None,
},
};
Ok(HandleBufferResult::Again)
}
InitializedState::Skipping { ref mut skip_left } => {
let skip = cmp::min(adapter.get_available(), *skip_left as usize);
adapter.flush(skip).unwrap();
*skip_left -= skip as u32;
Ok(HandleBufferResult::Again)
}
InitializedState::Setup { ref mut setup_state } => {
Self::update_setup_state(adapter, header, setup_state)
}
}
}
fn update_state(adapter: &mut Adapter,
stream_state: &mut StreamState)
-> Result<HandleBufferResult, FlowError> {
match *stream_state {
StreamState::NeedHeader => {
while adapter.get_available() >= 9 {
let mut data = [0u8; 9];
adapter.peek_into(&mut data).unwrap();
match flavors::header(&data) {
IResult::Error(_) |
IResult::Incomplete(_) => {
// fall through
}
IResult::Done(_, mut header) => {
if header.offset < 9 {
header.offset = 9;
}
let skip = header.offset - 9;
adapter.flush(9).unwrap();
*stream_state = StreamState::Initialized {
header: header,
initialized_state: InitializedState::Skipping { skip_left: skip },
};
return Ok(HandleBufferResult::Again);
}
}
adapter.flush(1).unwrap();
}
Ok(HandleBufferResult::NeedMoreData)
}
StreamState::Initialized { ref header, ref mut initialized_state } => {
Self::update_initialized_state(adapter, header, initialized_state)
}
}
}
}
impl Demuxer for FlvDemux {
fn start(&mut self,
_upstream_size: Option<u64>,
_random_access: bool)
-> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Started {
adapter: Adapter::new(),
stream_state: StreamState::NeedHeader,
};
Ok(())
}
fn stop(&mut self) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn seek(&mut self, start: u64, stop: Option<u64>) -> Result<SeekResult, ErrorMessage> {
unimplemented!();
}
fn handle_buffer(&mut self, buffer: Option<Buffer>) -> Result<HandleBufferResult, FlowError> {
let (adapter, stream_state) = match self.streaming_state {
StreamingState::Started { ref mut adapter, ref mut stream_state } => {
(adapter, stream_state)
}
StreamingState::Stopped => {
unreachable!();
}
};
if let Some(buffer) = buffer {
adapter.push(buffer);
}
Self::update_state(adapter, stream_state)
}
fn end_of_stream(&mut self) -> Result<(), ErrorMessage> {
// nothing to do here, all data we have left is incomplete
Ok(())
}
fn is_seekable(&self) -> bool {
false
}
fn get_position(&self) -> Option<u64> {
if let StreamingState::Started {
stream_state: StreamState::Initialized {
initialized_state: InitializedState::Setup {
setup_state: SetupState {
last_position, ..
}, ..
}, ..
}, ..
} = self.streaming_state {
return last_position;
}
None
}
fn get_duration(&self) -> Option<u64> {
None
}
}

View file

@ -23,6 +23,9 @@ extern crate url;
extern crate reqwest; extern crate reqwest;
#[macro_use] #[macro_use]
extern crate bitflags; extern crate bitflags;
#[macro_use]
extern crate nom;
extern crate flavors;
#[macro_use] #[macro_use]
pub mod utils; pub mod utils;
@ -35,11 +38,14 @@ pub mod rssink;
pub mod rsfilesrc; pub mod rsfilesrc;
pub mod rshttpsrc; pub mod rshttpsrc;
pub mod rsfilesink; pub mod rsfilesink;
pub mod rsdemuxer;
pub mod flvdemux;
use utils::*; use utils::*;
use rsfilesrc::FileSrc; use rsfilesrc::FileSrc;
use rshttpsrc::HttpSrc; use rshttpsrc::HttpSrc;
use rsfilesink::FileSink; use rsfilesink::FileSink;
use flvdemux::FlvDemux;
use std::os::raw::c_void; use std::os::raw::c_void;
use libc::c_char; use libc::c_char;
@ -171,3 +177,63 @@ pub unsafe extern "C" fn sinks_register(plugin: *const c_void) -> GBoolean {
GBoolean::True GBoolean::True
} }
unsafe fn demuxer_register(plugin: *const c_void,
name: &str,
long_name: &str,
description: &str,
classification: &str,
author: &str,
rank: i32,
create_instance: *const c_void,
input_format: &str,
output_formats: &str) {
extern "C" {
fn gst_rs_demuxer_register(plugin: *const c_void,
name: *const c_char,
long_name: *const c_char,
description: *const c_char,
classification: *const c_char,
author: *const c_char,
rank: i32,
create_instance: *const c_void,
input_format: *const c_char,
output_formats: *const c_char)
-> GBoolean;
}
let cname = CString::new(name).unwrap();
let clong_name = CString::new(long_name).unwrap();
let cdescription = CString::new(description).unwrap();
let cclassification = CString::new(classification).unwrap();
let cauthor = CString::new(author).unwrap();
let cinput_format = CString::new(input_format).unwrap();
let coutput_formats = CString::new(output_formats).unwrap();
gst_rs_demuxer_register(plugin,
cname.as_ptr(),
clong_name.as_ptr(),
cdescription.as_ptr(),
cclassification.as_ptr(),
cauthor.as_ptr(),
rank,
create_instance,
cinput_format.as_ptr(),
coutput_formats.as_ptr());
}
#[no_mangle]
pub unsafe extern "C" fn demuxers_register(plugin: *const c_void) -> GBoolean {
demuxer_register(plugin,
"rsflvdemux",
"FLV Demuxer",
"Demuxes FLV Streams",
"Codec/Demuxer",
"Sebastian Dröge <sebastian@centricular.com>",
256 + 100,
FlvDemux::new_boxed as *const c_void,
"video/x-flv",
"ANY");
GBoolean::True
}

View file

@ -20,6 +20,7 @@
#include "rssource.h" #include "rssource.h"
#include "rssink.h" #include "rssink.h"
#include "rsdemuxer.h"
static gboolean static gboolean
plugin_init (GstPlugin * plugin) plugin_init (GstPlugin * plugin)
@ -30,6 +31,9 @@ plugin_init (GstPlugin * plugin)
if (!gst_rs_sink_plugin_init (plugin)) if (!gst_rs_sink_plugin_init (plugin))
return FALSE; return FALSE;
if (!gst_rs_demuxer_plugin_init (plugin))
return FALSE;
return TRUE; return TRUE;
} }
@ -102,13 +106,13 @@ gst_rs_buffer_set_offset_end (GstBuffer * buffer, guint64 offset_end)
} }
GstBufferFlags GstBufferFlags
gst_rs_buffer_get_buffer_flags (GstBuffer * buffer) gst_rs_buffer_get_flags (GstBuffer * buffer)
{ {
return GST_MINI_OBJECT_FLAGS (buffer); return GST_MINI_OBJECT_FLAGS (buffer);
} }
void void
gst_rs_buffer_set_buffer_flags (GstBuffer * buffer, GstBufferFlags flags) gst_rs_buffer_set_flags (GstBuffer * buffer, GstBufferFlags flags)
{ {
GST_MINI_OBJECT_FLAGS (buffer) = flags; GST_MINI_OBJECT_FLAGS (buffer) = flags;
} }

522
src/rsdemuxer.c Normal file
View file

@ -0,0 +1,522 @@
/* Copyright (C) 2016 Sebastian Dröge <sebastian@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "rsdemuxer.h"
#include <string.h>
#include <stdint.h>
typedef struct
{
gchar *long_name;
gchar *description;
gchar *classification;
gchar *author;
void *create_instance;
gchar *input_format;
gchar *output_formats;
} ElementData;
static GHashTable *demuxers;
/* Declarations for Rust code */
extern gboolean demuxers_register (void *plugin);
extern void *demuxer_new (GstRsDemuxer * demuxer, void *create_instance);
extern void demuxer_drop (void *rsdemuxer);
extern gboolean demuxer_start (void *rsdemuxer, uint64_t upstream_size,
gboolean random_access);
extern gboolean demuxer_stop (void *rsdemuxer);
extern gboolean demuxer_is_seekable (void *rsdemuxer);
extern gboolean demuxer_get_position (void *rsdemuxer, uint64_t * position);
extern gboolean demuxer_get_duration (void *rsdemuxer, uint64_t * duration);
extern gboolean demuxer_seek (void *rsdemuxer, uint64_t start, uint64_t stop,
uint64_t * offset);
extern GstFlowReturn demuxer_handle_buffer (void *rsdemuxer,
GstBuffer * buffer);
extern void demuxer_end_of_stream (void *rsdemuxer);
extern void cstring_drop (void *str);
GST_DEBUG_CATEGORY_STATIC (gst_rs_demuxer_debug);
#define GST_CAT_DEFAULT gst_rs_demuxer_debug
static void gst_rs_demuxer_finalize (GObject * object);
static gboolean gst_rs_demuxer_sink_activate (GstPad * pad, GstObject * parent);
static gboolean gst_rs_demuxer_sink_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static GstFlowReturn gst_rs_demuxer_sink_chain (GstPad * pad,
GstObject * parent, GstBuffer * buf);
static gboolean gst_rs_demuxer_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static gboolean gst_rs_demuxer_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstStateChangeReturn gst_rs_demuxer_change_state (GstElement * element,
GstStateChange transition);
static void gst_rs_demuxer_loop (GstRsDemuxer * demuxer);
static GObjectClass *parent_class;
static void
gst_rs_demuxer_class_init (GstRsDemuxerClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
ElementData *data = g_hash_table_lookup (demuxers,
GSIZE_TO_POINTER (G_TYPE_FROM_CLASS (klass)));
GstCaps *caps;
GstPadTemplate *templ;
g_assert (data != NULL);
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
gobject_class->finalize = gst_rs_demuxer_finalize;
gstelement_class->change_state = gst_rs_demuxer_change_state;
gst_element_class_set_static_metadata (gstelement_class,
data->long_name, data->classification, data->description, data->author);
caps = gst_caps_from_string (data->input_format);
templ = gst_pad_template_new ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, caps);
gst_caps_unref (caps);
gst_element_class_add_pad_template (gstelement_class, templ);
caps = gst_caps_from_string (data->output_formats);
templ = gst_pad_template_new ("src_%u", GST_PAD_SRC, GST_PAD_SOMETIMES, caps);
gst_caps_unref (caps);
gst_element_class_add_pad_template (gstelement_class, templ);
}
static void
gst_rs_demuxer_init (GstRsDemuxer * demuxer, GstRsDemuxerClass * klass)
{
ElementData *data = g_hash_table_lookup (demuxers,
GSIZE_TO_POINTER (G_TYPE_FROM_CLASS (klass)));
GstPadTemplate *templ;
g_assert (data != NULL);
demuxer->instance = demuxer_new (demuxer, data->create_instance);
templ =
gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
demuxer->sinkpad = gst_pad_new_from_template (templ, "sink");
gst_pad_set_activate_function (demuxer->sinkpad,
gst_rs_demuxer_sink_activate);
gst_pad_set_activatemode_function (demuxer->sinkpad,
gst_rs_demuxer_sink_activate_mode);
gst_pad_set_chain_function (demuxer->sinkpad, gst_rs_demuxer_sink_chain);
gst_pad_set_event_function (demuxer->sinkpad, gst_rs_demuxer_sink_event);
gst_element_add_pad (GST_ELEMENT (demuxer), demuxer->sinkpad);
demuxer->flow_combiner = gst_flow_combiner_new ();
}
static void
gst_rs_demuxer_finalize (GObject * object)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (object);
gst_flow_combiner_free (demuxer->flow_combiner);
demuxer_drop (demuxer->instance);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gboolean
gst_rs_demuxer_sink_activate (GstPad * pad, GstObject * parent)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
GstQuery *query;
GstPadMode mode = GST_PAD_MODE_PUSH;
query = gst_query_new_scheduling ();
if (!gst_pad_peer_query (pad, query)) {
gst_query_unref (query);
return FALSE;
}
// TODO
//if (gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE))
// mode = GST_PAD_MODE_PULL;
gst_query_unref (query);
demuxer->upstream_size = -1;
query = gst_query_new_duration (GST_FORMAT_BYTES);
if (gst_pad_peer_query (pad, query)) {
gst_query_parse_duration (query, NULL, &demuxer->upstream_size);
}
gst_query_unref (query);
return gst_pad_activate_mode (pad, mode, TRUE);
}
static gboolean
gst_rs_demuxer_sink_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
gboolean res = TRUE;
if (active) {
if (!demuxer_start (demuxer->instance, demuxer->upstream_size,
mode == GST_PAD_MODE_PULL ? TRUE : FALSE)) {
res = FALSE;
goto out;
}
if (mode == GST_PAD_MODE_PULL)
gst_pad_start_task (pad, (GstTaskFunction) gst_rs_demuxer_loop, demuxer,
NULL);
} else {
if (mode == GST_PAD_MODE_PULL)
gst_pad_stop_task (pad);
}
out:
return res;
}
static GstFlowReturn
gst_rs_demuxer_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
return demuxer_handle_buffer (demuxer->instance, buf);
}
static gboolean
gst_rs_demuxer_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
gboolean res = FALSE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:{
// TODO
res = TRUE;
gst_event_unref (event);
break;
}
case GST_EVENT_EOS:
demuxer_end_of_stream (demuxer->instance);
res = gst_pad_event_default (pad, parent, event);
break;
default:
res = gst_pad_event_default (pad, parent, event);
break;
}
return res;
}
static gboolean
gst_rs_demuxer_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
gboolean res = FALSE;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:{
GstFormat format;
gst_query_parse_position (query, &format, NULL);
if (format == GST_FORMAT_TIME) {
gint64 position;
if (demuxer_get_position (demuxer->instance, &position)) {
gst_query_set_position (query, format, position);
res = TRUE;
} else {
res = FALSE;
}
}
break;
}
case GST_QUERY_DURATION:{
GstFormat format;
gst_query_parse_duration (query, &format, NULL);
if (format == GST_FORMAT_TIME) {
gint64 duration;
if (demuxer_get_duration (demuxer->instance, &duration)) {
gst_query_set_duration (query, format, duration);
res = TRUE;
} else {
res = FALSE;
}
}
break;
}
default:
res = gst_pad_query_default (pad, parent, query);
break;
}
return res;
}
static gboolean
gst_rs_demuxer_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (parent);
gboolean res = FALSE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:{
// TODO
g_assert_not_reached ();
gst_event_unref (event);
break;
}
default:
res = gst_pad_event_default (pad, parent, event);
break;
}
return res;
}
static GstStateChangeReturn
gst_rs_demuxer_change_state (GstElement * element, GstStateChange transition)
{
GstRsDemuxer *demuxer = GST_RS_DEMUXER (element);
GstStateChangeReturn result;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
demuxer->offset = 0;
gst_segment_init (&demuxer->segment, GST_FORMAT_TIME);
demuxer->group_id = gst_util_group_id_next ();
demuxer->segment_seqnum = gst_util_seqnum_next ();
break;
default:
break;
}
if (result == GST_STATE_CHANGE_FAILURE)
return result;
result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (result == GST_STATE_CHANGE_FAILURE)
return result;
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:{
guint i;
/* Ignore stop failures */
demuxer_stop (demuxer->instance);
gst_flow_combiner_clear (demuxer->flow_combiner);
for (i = 0; i < demuxer->n_srcpads; i++)
gst_element_remove_pad (GST_ELEMENT (demuxer), demuxer->srcpads[i]);
memset (demuxer->srcpads, 0, sizeof (demuxer->srcpads));
break;
}
default:
break;
}
return result;
}
static void
gst_rs_demuxer_loop (GstRsDemuxer * demuxer)
{
// TODO
g_assert_not_reached ();
}
void
gst_rs_demuxer_add_stream (GstRsDemuxer * demuxer, guint32 index,
const gchar * format, const gchar * stream_id)
{
GstPad *pad;
GstPadTemplate *templ;
GstCaps *caps;
GstEvent *event;
gchar *name, *full_stream_id;
g_assert (demuxer->srcpads[index] == NULL);
g_assert (demuxer->n_srcpads == index);
templ =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (demuxer),
"src_%u");
name = g_strdup_printf ("src_%u", index);
pad = demuxer->srcpads[index] = gst_pad_new_from_template (templ, name);
g_free (name);
gst_pad_set_query_function (pad, gst_rs_demuxer_src_query);
gst_pad_set_event_function (pad, gst_rs_demuxer_src_event);
gst_pad_set_active (pad, TRUE);
full_stream_id =
gst_pad_create_stream_id (pad, GST_ELEMENT (demuxer), stream_id);
event = gst_event_new_stream_start (full_stream_id);
gst_event_set_group_id (event, demuxer->group_id);
g_free (full_stream_id);
gst_pad_push_event (pad, event);
caps = gst_caps_from_string (format);
event = gst_event_new_caps (caps);
gst_caps_unref (caps);
gst_pad_push_event (pad, event);
event = gst_event_new_segment (&demuxer->segment);
gst_event_set_seqnum (event, demuxer->segment_seqnum);
gst_pad_push_event (pad, event);
gst_flow_combiner_add_pad (demuxer->flow_combiner, pad);
gst_element_add_pad (GST_ELEMENT (demuxer), pad);
demuxer->n_srcpads++;
}
void
gst_rs_demuxer_added_all_streams (GstRsDemuxer * demuxer)
{
gst_element_no_more_pads (GST_ELEMENT (demuxer));
demuxer->group_id = gst_util_group_id_next ();
}
void
gst_rs_demuxer_stream_format_changed (GstRsDemuxer * demuxer, guint32 index,
const gchar * format)
{
GstCaps *caps;
GstEvent *event;
g_assert (demuxer->srcpads[index] != NULL);
caps = gst_caps_from_string (format);
event = gst_event_new_caps (caps);
gst_caps_unref (caps);
gst_pad_push_event (demuxer->srcpads[index], event);
}
void
gst_rs_demuxer_stream_eos (GstRsDemuxer * demuxer, guint32 index)
{
GstCaps *caps;
GstEvent *event;
g_assert (demuxer->srcpads[index] != NULL);
event = gst_event_new_eos ();
if (index == -1) {
gint i;
for (i = 0; i < demuxer->n_srcpads; i++)
gst_pad_push_event (demuxer->srcpads[i], gst_event_ref (event));
gst_event_unref (event);
} else {
gst_pad_push_event (demuxer->srcpads[index], event);
}
}
GstFlowReturn
gst_rs_demuxer_stream_push_buffer (GstRsDemuxer * demuxer, guint32 index,
GstBuffer * buffer)
{
GstFlowReturn res = GST_FLOW_OK;
g_assert (demuxer->srcpads[index] != NULL);
res = gst_pad_push (demuxer->srcpads[index], buffer);
res = gst_flow_combiner_update_flow (demuxer->flow_combiner, res);
return res;
}
void
gst_rs_demuxer_remove_all_streams (GstRsDemuxer * demuxer)
{
guint i;
gst_flow_combiner_clear (demuxer->flow_combiner);
for (i = 0; i < demuxer->n_srcpads; i++)
gst_element_remove_pad (GST_ELEMENT (demuxer), demuxer->srcpads[i]);
memset (demuxer->srcpads, 0, sizeof (demuxer->srcpads));
}
gboolean
gst_rs_demuxer_plugin_init (GstPlugin * plugin)
{
demuxers = g_hash_table_new (g_direct_hash, g_direct_equal);
GST_DEBUG_CATEGORY_INIT (gst_rs_demuxer_debug, "rsdemux", 0,
"rsdemux element");
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
return demuxers_register (plugin);
}
gboolean
gst_rs_demuxer_register (GstPlugin * plugin, const gchar * name,
const gchar * long_name, const gchar * description,
const gchar * classification, const gchar * author, GstRank rank,
void *create_instance, const gchar * input_format,
const gchar * output_formats)
{
GTypeInfo type_info = {
sizeof (GstRsDemuxerClass),
NULL,
NULL,
(GClassInitFunc) gst_rs_demuxer_class_init,
NULL,
NULL,
sizeof (GstRsDemuxer),
0,
(GInstanceInitFunc) gst_rs_demuxer_init
};
GType type;
gchar *type_name;
ElementData *data;
data = g_new0 (ElementData, 1);
data->long_name = g_strdup (long_name);
data->description = g_strdup (description);
data->classification = g_strdup (classification);
data->author = g_strdup (author);
data->create_instance = create_instance;
data->input_format = g_strdup (input_format);
data->output_formats = g_strdup (output_formats);
type_name = g_strconcat ("RsDemuxer-", name, NULL);
type = g_type_register_static (GST_TYPE_ELEMENT, type_name, &type_info, 0);
g_free (type_name);
g_hash_table_insert (demuxers, GSIZE_TO_POINTER (type), data);
if (!gst_element_register (plugin, name, rank, type))
return FALSE;
return TRUE;
}

62
src/rsdemuxer.h Normal file
View file

@ -0,0 +1,62 @@
/* Copyright (C) 2016 Sebastian Dröge <sebastian@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_RS_DEMUXER_H__
#define __GST_RS_DEMUXER_H__
#include <gst/gst.h>
#include <gst/base/base.h>
G_BEGIN_DECLS
#define GST_RS_DEMUXER(obj) \
((GstRsDemuxer *)obj)
#define GST_RS_DEMUXER_CLASS(klass) \
((GstRsDemuxerKlass *)klass)
typedef struct _GstRsDemuxer GstRsDemuxer;
typedef struct _GstRsDemuxerClass GstRsDemuxerClass;
struct _GstRsDemuxer {
GstElement element;
gpointer instance;
GstPad *sinkpad;
guint64 offset;
guint64 upstream_size;
GstPad *srcpads[32];
guint n_srcpads;
guint32 group_id;
GstSegment segment;
guint32 segment_seqnum;
GstFlowCombiner *flow_combiner;
};
struct _GstRsDemuxerClass {
GstElementClass parent_class;
};
G_GNUC_INTERNAL gboolean gst_rs_demuxer_plugin_init (GstPlugin * plugin);
G_END_DECLS
#endif /* __GST_RS_DEMUXER_H__ */

433
src/rsdemuxer.rs Normal file
View file

@ -0,0 +1,433 @@
// Copyright (C) 2016 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
// Boston, MA 02110-1301, USA.
use libc::c_char;
use std::os::raw::c_void;
use std::ffi::CString;
use std::panic::{self, AssertUnwindSafe};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::u32;
use std::u64;
use utils::*;
use error::*;
use buffer::*;
pub type StreamIndex = u32;
#[derive(Debug)]
pub enum SeekResult {
TooEarly,
Ok(u64),
Eos,
}
#[derive(Debug)]
pub enum HandleBufferResult {
NeedMoreData,
Again,
// NeedDataFromOffset(u64),
StreamAdded(Stream),
HaveAllStreams,
StreamChanged(Stream),
// StreamsAdded(Vec<Stream>), // Implies HaveAllStreams
// StreamsChanged(Vec<Stream>),
// TODO need something to replace/add new streams
// TODO should probably directly implement the GstStreams new world order
BufferForStream(StreamIndex, Buffer),
Eos(Option<StreamIndex>),
}
pub trait Demuxer {
fn start(&mut self,
upstream_size: Option<u64>,
random_access: bool)
-> Result<(), ErrorMessage>;
fn stop(&mut self) -> Result<(), ErrorMessage>;
fn seek(&mut self, start: u64, stop: Option<u64>) -> Result<SeekResult, ErrorMessage>;
fn handle_buffer(&mut self, buffer: Option<Buffer>) -> Result<HandleBufferResult, FlowError>;
fn end_of_stream(&mut self) -> Result<(), ErrorMessage>;
fn is_seekable(&self) -> bool;
fn get_position(&self) -> Option<u64>;
fn get_duration(&self) -> Option<u64>;
}
#[derive(Debug)]
pub struct Stream {
pub index: StreamIndex,
pub format: String,
pub stream_id: String,
}
impl Stream {
pub fn new(index: StreamIndex, format: String, stream_id: String) -> Stream {
Stream {
index: index,
format: format,
stream_id: stream_id,
}
}
}
pub struct DemuxerWrapper {
raw: *mut c_void,
demuxer: Mutex<Box<Demuxer>>,
panicked: AtomicBool,
}
impl DemuxerWrapper {
fn new(raw: *mut c_void, demuxer: Box<Demuxer>) -> DemuxerWrapper {
DemuxerWrapper {
raw: raw,
demuxer: Mutex::new(demuxer),
panicked: AtomicBool::new(false),
}
}
fn start(&self, upstream_size: u64, random_access: bool) -> bool {
let demuxer = &mut self.demuxer.lock().unwrap();
let upstream_size = if upstream_size == u64::MAX {
None
} else {
Some(upstream_size)
};
match demuxer.start(upstream_size, random_access) {
Ok(..) => true,
Err(ref msg) => {
self.post_message(msg);
false
}
}
}
fn stop(&self) -> bool {
let demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.stop() {
Ok(..) => true,
Err(ref msg) => {
self.post_message(msg);
false
}
}
}
fn is_seekable(&self) -> bool {
let demuxer = &self.demuxer.lock().unwrap();
demuxer.is_seekable()
}
fn get_position(&self, position: &mut u64) -> GBoolean {
let demuxer = &self.demuxer.lock().unwrap();
match demuxer.get_position() {
None => {
*position = u64::MAX;
GBoolean::False
}
Some(pos) => {
*position = pos;
GBoolean::True
}
}
}
fn get_duration(&self, position: &mut u64) -> GBoolean {
let demuxer = &self.demuxer.lock().unwrap();
match demuxer.get_duration() {
None => {
*position = u64::MAX;
GBoolean::False
}
Some(pos) => {
*position = pos;
GBoolean::True
}
}
}
fn seek(&self, start: u64, stop: u64, offset: &mut u64) -> bool {
extern "C" {
fn gst_rs_demuxer_stream_eos(raw: *mut c_void, index: u32);
};
let stop = if stop == u64::MAX { None } else { Some(stop) };
let res = {
let mut demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.seek(start, stop) {
Ok(res) => res,
Err(ref msg) => {
self.post_message(msg);
return false;
}
}
};
match res {
SeekResult::TooEarly => false,
SeekResult::Ok(off) => {
*offset = off;
true
}
SeekResult::Eos => {
*offset = u64::MAX;
unsafe {
gst_rs_demuxer_stream_eos(self.raw, u32::MAX);
}
true
}
}
}
fn handle_buffer(&self, buffer: Buffer) -> GstFlowReturn {
extern "C" {
fn gst_rs_demuxer_stream_eos(raw: *mut c_void, index: u32);
fn gst_rs_demuxer_add_stream(raw: *mut c_void,
index: u32,
format: *const c_char,
stream_id: *const c_char);
fn gst_rs_demuxer_added_all_streams(raw: *mut c_void);
// fn gst_rs_demuxer_remove_all_streams(raw: *mut c_void);
fn gst_rs_demuxer_stream_format_changed(raw: *mut c_void,
index: u32,
format: *const c_char);
fn gst_rs_demuxer_stream_push_buffer(raw: *mut c_void,
index: u32,
buffer: *mut c_void)
-> GstFlowReturn;
};
let mut res = {
let mut demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.handle_buffer(Some(buffer)) {
Ok(res) => res,
Err(flow_error) => {
match flow_error {
FlowError::NotNegotiated(ref msg) |
FlowError::Error(ref msg) => self.post_message(msg),
_ => (),
}
return flow_error.to_native();
}
}
};
// Loop until AllEos, NeedMoreData or error when pushing downstream
loop {
match res {
HandleBufferResult::NeedMoreData => {
return GstFlowReturn::Ok;
}
HandleBufferResult::StreamAdded(stream) => {
let format_cstr = CString::new(stream.format.as_bytes()).unwrap();
let stream_id_cstr = CString::new(stream.stream_id.as_bytes()).unwrap();
unsafe {
gst_rs_demuxer_add_stream(self.raw,
stream.index,
format_cstr.as_ptr(),
stream_id_cstr.as_ptr());
}
}
HandleBufferResult::HaveAllStreams => unsafe {
gst_rs_demuxer_added_all_streams(self.raw);
},
HandleBufferResult::StreamChanged(stream) => {
let format_cstr = CString::new(stream.format.as_bytes()).unwrap();
unsafe {
gst_rs_demuxer_stream_format_changed(self.raw,
stream.index,
format_cstr.as_ptr());
}
}
HandleBufferResult::BufferForStream(index, buffer) => {
let flow_ret = unsafe {
gst_rs_demuxer_stream_push_buffer(self.raw, index, buffer.into_ptr())
};
if flow_ret != GstFlowReturn::Ok {
return flow_ret;
}
}
HandleBufferResult::Eos(index) => {
let index = index.unwrap_or(u32::MAX);
unsafe {
gst_rs_demuxer_stream_eos(self.raw, index);
}
return GstFlowReturn::Eos;
}
HandleBufferResult::Again => {
// nothing, just call again
}
};
res = {
let mut demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.handle_buffer(None) {
Ok(res) => res,
Err(flow_error) => {
match flow_error {
FlowError::NotNegotiated(ref msg) |
FlowError::Error(ref msg) => self.post_message(msg),
_ => (),
}
return flow_error.to_native();
}
}
}
}
}
fn end_of_stream(&self) {
let mut demuxer = &mut self.demuxer.lock().unwrap();
match demuxer.end_of_stream() {
Ok(_) => (),
Err(ref msg) => {
self.post_message(msg);
}
}
}
fn post_message(&self, msg: &ErrorMessage) {
unsafe {
msg.post(self.raw);
}
}
}
#[no_mangle]
pub extern "C" fn demuxer_new(demuxer: *mut c_void,
create_instance: fn() -> Box<Demuxer>)
-> *mut DemuxerWrapper {
Box::into_raw(Box::new(DemuxerWrapper::new(demuxer, create_instance())))
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_drop(ptr: *mut DemuxerWrapper) {
let _ = Box::from_raw(ptr);
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_start(ptr: *const DemuxerWrapper,
upstream_size: u64,
random_access: GBoolean)
-> GBoolean {
let wrap: &DemuxerWrapper = &*ptr;
panic_to_error!(wrap, GBoolean::False, {
GBoolean::from_bool(wrap.start(upstream_size, random_access.to_bool()))
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_stop(ptr: *const DemuxerWrapper) -> GBoolean {
let wrap: &DemuxerWrapper = &*ptr;
panic_to_error!(wrap, GBoolean::False, {
GBoolean::from_bool(wrap.stop())
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_is_seekable(ptr: *const DemuxerWrapper) -> GBoolean {
let wrap: &DemuxerWrapper = &*ptr;
panic_to_error!(wrap, GBoolean::False, {
GBoolean::from_bool(wrap.is_seekable())
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_get_position(ptr: *const DemuxerWrapper,
position: *mut u64)
-> GBoolean {
let wrap: &DemuxerWrapper = &*ptr;
panic_to_error!(wrap, GBoolean::False, {
let position = &mut *position;
wrap.get_position(position)
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_get_duration(ptr: *const DemuxerWrapper,
duration: *mut u64)
-> GBoolean {
let wrap: &DemuxerWrapper = &*ptr;
panic_to_error!(wrap, GBoolean::False, {
let duration = &mut *duration;
wrap.get_duration(duration)
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_seek(ptr: *mut DemuxerWrapper,
start: u64,
stop: u64,
offset: *mut u64)
-> GBoolean {
let wrap: &mut DemuxerWrapper = &mut *ptr;
panic_to_error!(wrap, GBoolean::False, {
let offset = &mut *offset;
GBoolean::from_bool(wrap.seek(start, stop, offset))
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_handle_buffer(ptr: *mut DemuxerWrapper,
buffer: *mut c_void)
-> GstFlowReturn {
let wrap: &mut DemuxerWrapper = &mut *ptr;
panic_to_error!(wrap, GstFlowReturn::Error, {
let buffer = Buffer::new_from_ptr_owned(buffer);
wrap.handle_buffer(buffer)
})
}
#[no_mangle]
pub unsafe extern "C" fn demuxer_end_of_stream(ptr: *mut DemuxerWrapper) {
let wrap: &mut DemuxerWrapper = &mut *ptr;
panic_to_error!(wrap, (), {
wrap.end_of_stream();
})
}