s/GstBuffer/GstData/ in the API where you can pass events. Fix the plugins to deal with that. Fixes #113488. Also inc...

Original commit message from CVS:
s/GstBuffer/GstData/ in the API where you can pass events. Fix the plugins to deal with that. Fixes #113488. Also includes scheduler patches, and probably fixes some queue bugs relating to events and buffers.
This commit is contained in:
Andy Wingo 2003-10-08 16:06:02 +00:00
parent 63d21b3e7a
commit 31d748d332
41 changed files with 354 additions and 413 deletions

View file

@ -92,7 +92,7 @@ GST_PAD_TEMPLATE_FACTORY (src_factory,
static void gst_example_class_init (GstExampleClass *klass); static void gst_example_class_init (GstExampleClass *klass);
static void gst_example_init (GstExample *example); static void gst_example_init (GstExample *example);
static void gst_example_chain (GstPad *pad, GstBuffer *buf); static void gst_example_chain (GstPad *pad, GstData *_data);
static void gst_example_set_property (GObject *object, guint prop_id, static void gst_example_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec); const GValue *value, GParamSpec *pspec);
@ -227,8 +227,9 @@ gst_example_init(GstExample *example)
* as the buffer provided by the peer element. * as the buffer provided by the peer element.
*/ */
static void static void
gst_example_chain (GstPad *pad, GstBuffer *buf) gst_example_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstExample *example; GstExample *example;
GstBuffer *outbuf; GstBuffer *outbuf;
@ -272,7 +273,7 @@ gst_example_chain (GstPad *pad, GstBuffer *buf)
* in the pipeline, through the element's source pad, which is stored * in the pipeline, through the element's source pad, which is stored
* in the element's structure. * in the element's structure.
*/ */
gst_pad_push(example->srcpad,outbuf); gst_pad_push(example->srcpad,GST_DATA (outbuf));
/* For fun we'll emit our useless signal here */ /* For fun we'll emit our useless signal here */
g_signal_emit(G_OBJECT (example), gst_example_signals[ASDF], 0, g_signal_emit(G_OBJECT (example), gst_example_signals[ASDF], 0,
@ -280,7 +281,7 @@ gst_example_chain (GstPad *pad, GstBuffer *buf)
/* If we're not doing something, just send the original incoming buffer. */ /* If we're not doing something, just send the original incoming buffer. */
} else { } else {
gst_pad_push(example->srcpad,buf); gst_pad_push(example->srcpad,GST_DATA (buf));
} }
} }

View file

@ -212,7 +212,7 @@ gst_autoplugcache_loop (GstElement *element)
/* the first time through, the current_playout pointer is going to be NULL */ /* the first time through, the current_playout pointer is going to be NULL */
if (cache->current_playout == NULL) { if (cache->current_playout == NULL) {
/* get a buffer */ /* get a buffer */
buf = gst_pad_pull (cache->sinkpad); buf = GST_BUFFER (gst_pad_pull (cache->sinkpad));
if (GST_IS_EVENT (buf)) { if (GST_IS_EVENT (buf)) {
gst_pad_event_default (cache->sinkpad, GST_EVENT (buf)); gst_pad_event_default (cache->sinkpad, GST_EVENT (buf));
return; return;
@ -229,7 +229,7 @@ gst_autoplugcache_loop (GstElement *element)
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf); g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
/* send the buffer on its way */ /* send the buffer on its way */
gst_pad_push (cache->srcpad, buf); gst_pad_push (cache->srcpad, GST_DATA (buf));
} }
/* the steady state is where the playout is at the front of the cache */ /* the steady state is where the playout is at the front of the cache */
else if (g_list_previous(cache->current_playout) == NULL) { else if (g_list_previous(cache->current_playout) == NULL) {
@ -249,7 +249,7 @@ gst_autoplugcache_loop (GstElement *element)
} }
/* get a buffer */ /* get a buffer */
buf = gst_pad_pull (cache->sinkpad); buf = GST_BUFFER (gst_pad_pull (cache->sinkpad));
if (GST_IS_EVENT (buf)) { if (GST_IS_EVENT (buf)) {
gst_pad_event_default (cache->sinkpad, GST_EVENT (buf)); gst_pad_event_default (cache->sinkpad, GST_EVENT (buf));
return; return;
@ -264,7 +264,7 @@ gst_autoplugcache_loop (GstElement *element)
cache->current_playout = cache->cache; cache->current_playout = cache->cache;
/* send the buffer on its way */ /* send the buffer on its way */
gst_pad_push (cache->srcpad, buf); gst_pad_push (cache->srcpad, GST_DATA (buf));
} }
/* otherwise we're trundling through existing cached buffers */ /* otherwise we're trundling through existing cached buffers */
@ -278,7 +278,7 @@ gst_autoplugcache_loop (GstElement *element)
} }
/* push that buffer */ /* push that buffer */
gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data)); gst_pad_push (cache->srcpad, GST_DATA (GST_BUFFER(cache->current_playout->data)));
} }
} }

View file

@ -186,7 +186,7 @@ gst_spider_identity_chain (GstPad *pad, GstBuffer *buf)
if (conn->current != (GstElement *) conn->src) { if (conn->current != (GstElement *) conn->src) {
GST_DEBUG ("sending EOS to unconnected element %s from %s", GST_DEBUG ("sending EOS to unconnected element %s from %s",
GST_ELEMENT_NAME (conn->src), GST_ELEMENT_NAME (ident)); GST_ELEMENT_NAME (conn->src), GST_ELEMENT_NAME (ident));
gst_pad_push (conn->src->src, GST_BUFFER (gst_event_new (GST_EVENT_EOS))); gst_pad_push (conn->src->src, GST_DATA (GST_BUFFER (gst_event_new (GST_EVENT_EOS))));
gst_element_set_eos (GST_ELEMENT (conn->src)); gst_element_set_eos (GST_ELEMENT (conn->src));
} }
} }
@ -200,7 +200,7 @@ gst_spider_identity_chain (GstPad *pad, GstBuffer *buf)
if ((ident->src != NULL) && (GST_PAD_PEER (ident->src) != NULL)) { if ((ident->src != NULL) && (GST_PAD_PEER (ident->src) != NULL)) {
/* g_print("pushing buffer %p (refcount %d - buffersize %d) to pad %s:%s\n", buf, GST_BUFFER_REFCOUNT (buf), GST_BUFFER_SIZE (buf), GST_DEBUG_PAD_NAME (ident->src)); */ /* g_print("pushing buffer %p (refcount %d - buffersize %d) to pad %s:%s\n", buf, GST_BUFFER_REFCOUNT (buf), GST_BUFFER_SIZE (buf), GST_DEBUG_PAD_NAME (ident->src)); */
GST_LOG ( "push %p %" G_GINT64_FORMAT, buf, GST_BUFFER_OFFSET (buf)); GST_LOG ( "push %p %" G_GINT64_FORMAT, buf, GST_BUFFER_OFFSET (buf));
gst_pad_push (ident->src, buf); gst_pad_push (ident->src, GST_DATA (buf));
} else if (GST_IS_BUFFER (buf)) { } else if (GST_IS_BUFFER (buf)) {
gst_buffer_unref (buf); gst_buffer_unref (buf);
} }
@ -386,7 +386,7 @@ gst_spider_identity_dumb_loop (GstSpiderIdentity *ident)
g_return_if_fail (GST_IS_SPIDER_IDENTITY (ident)); g_return_if_fail (GST_IS_SPIDER_IDENTITY (ident));
g_assert (ident->sink != NULL); g_assert (ident->sink != NULL);
buf = gst_pad_pull (ident->sink); buf = GST_BUFFER (gst_pad_pull (ident->sink));
gst_spider_identity_chain (ident->sink, buf); gst_spider_identity_chain (ident->sink, buf);
} }

View file

@ -92,7 +92,7 @@ static void gst_aggregator_set_property (GObject *object, guint prop_id,
static void gst_aggregator_get_property (GObject *object, guint prop_id, static void gst_aggregator_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_aggregator_chain (GstPad *pad, GstBuffer *buf); static void gst_aggregator_chain (GstPad *pad, GstData *_data);
static void gst_aggregator_loop (GstElement *element); static void gst_aggregator_loop (GstElement *element);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -286,7 +286,7 @@ gst_aggregator_push (GstAggregator *aggregator, GstPad *pad, GstBuffer *buf, guc
g_object_notify (G_OBJECT (aggregator), "last_message"); g_object_notify (G_OBJECT (aggregator), "last_message");
} }
gst_pad_push (aggregator->srcpad, buf); gst_pad_push (aggregator->srcpad, GST_DATA (buf));
} }
static void static void
@ -313,7 +313,7 @@ gst_aggregator_loop (GstElement *element)
* and that the peer pad is also enabled. * and that the peer pad is also enabled.
*/ */
if (GST_PAD_IS_USABLE (pad)) { if (GST_PAD_IS_USABLE (pad)) {
buf = gst_pad_pull (pad); buf = GST_BUFFER (gst_pad_pull (pad));
debug = "loop"; debug = "loop";
/* then push it forward */ /* then push it forward */
@ -328,7 +328,7 @@ gst_aggregator_loop (GstElement *element)
debug = "loop_select"; debug = "loop_select";
pad = gst_pad_select (aggregator->sinkpads); pad = gst_pad_select (aggregator->sinkpads);
buf = gst_pad_pull (pad); buf = GST_BUFFER (gst_pad_pull (pad));
gst_aggregator_push (aggregator, pad, buf, debug); gst_aggregator_push (aggregator, pad, buf, debug);
} }
@ -346,8 +346,9 @@ gst_aggregator_loop (GstElement *element)
* Chain a buffer on a pad. * Chain a buffer on a pad.
*/ */
static void static void
gst_aggregator_chain (GstPad *pad, GstBuffer *buf) gst_aggregator_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstAggregator *aggregator; GstAggregator *aggregator;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -102,7 +102,7 @@ static void gst_fakesink_get_property (GObject *object, guint prop_id,
static GstElementStateReturn static GstElementStateReturn
gst_fakesink_change_state (GstElement *element); gst_fakesink_change_state (GstElement *element);
static void gst_fakesink_chain (GstPad *pad, GstBuffer *buf); static void gst_fakesink_chain (GstPad *pad, GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
static guint gst_fakesink_signals[LAST_SIGNAL] = { 0 }; static guint gst_fakesink_signals[LAST_SIGNAL] = { 0 };
@ -298,8 +298,9 @@ gst_fakesink_get_property (GObject *object, guint prop_id, GValue *value, GParam
} }
static void static void
gst_fakesink_chain (GstPad *pad, GstBuffer *buf) gst_fakesink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstFakeSink *fakesink; GstFakeSink *fakesink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -170,7 +170,7 @@ static void gst_fakesrc_get_property (GObject *object, guint prop_id,
static GstElementStateReturn gst_fakesrc_change_state (GstElement *element); static GstElementStateReturn gst_fakesrc_change_state (GstElement *element);
static GstBuffer* gst_fakesrc_get (GstPad *pad); static GstData* gst_fakesrc_get (GstPad *pad);
static void gst_fakesrc_loop (GstElement *element); static void gst_fakesrc_loop (GstElement *element);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -734,7 +734,7 @@ gst_fakesrc_create_buffer (GstFakeSrc *src)
return buf; return buf;
} }
static GstBuffer * static GstData *
gst_fakesrc_get(GstPad *pad) gst_fakesrc_get(GstPad *pad)
{ {
GstFakeSrc *src; GstFakeSrc *src;
@ -748,22 +748,22 @@ gst_fakesrc_get(GstPad *pad)
if (src->need_flush) { if (src->need_flush) {
src->need_flush = FALSE; src->need_flush = FALSE;
return GST_BUFFER(gst_event_new (GST_EVENT_FLUSH)); return GST_DATA(gst_event_new (GST_EVENT_FLUSH));
} }
if (src->buffer_count == src->segment_end) { if (src->buffer_count == src->segment_end) {
if (src->segment_loop) { if (src->segment_loop) {
return GST_BUFFER(gst_event_new (GST_EVENT_SEGMENT_DONE)); return GST_DATA(gst_event_new (GST_EVENT_SEGMENT_DONE));
} }
else { else {
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA(gst_event_new (GST_EVENT_EOS));
} }
} }
if (src->rt_num_buffers == 0) { if (src->rt_num_buffers == 0) {
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA(gst_event_new (GST_EVENT_EOS));
} }
else { else {
if (src->rt_num_buffers > 0) if (src->rt_num_buffers > 0)
@ -772,7 +772,7 @@ gst_fakesrc_get(GstPad *pad)
if (src->eos) { if (src->eos) {
GST_INFO ( "fakesrc is setting eos on pad"); GST_INFO ( "fakesrc is setting eos on pad");
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA(gst_event_new (GST_EVENT_EOS));
} }
buf = gst_fakesrc_create_buffer (src); buf = gst_fakesrc_create_buffer (src);
@ -795,7 +795,7 @@ gst_fakesrc_get(GstPad *pad)
GST_LOG_OBJECT (src, "post handoff emit"); GST_LOG_OBJECT (src, "post handoff emit");
} }
return buf; return GST_DATA (buf);
} }
/** /**
@ -819,10 +819,10 @@ gst_fakesrc_loop(GstElement *element)
while (pads) { while (pads) {
GstPad *pad = GST_PAD (pads->data); GstPad *pad = GST_PAD (pads->data);
GstBuffer *buf; GstData *data;
buf = gst_fakesrc_get (pad); data = gst_fakesrc_get (pad);
gst_pad_push (pad, buf); gst_pad_push (pad, data);
if (src->eos) { if (src->eos) {
return; return;

View file

@ -61,7 +61,7 @@ static void gst_fdsink_set_property (GObject *object, guint prop_id,
static void gst_fdsink_get_property (GObject *object, guint prop_id, static void gst_fdsink_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_fdsink_chain (GstPad *pad,GstBuffer *buf); static void gst_fdsink_chain (GstPad *pad,GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
/*static guint gst_fdsink_signals[LAST_SIGNAL] = { 0 };*/ /*static guint gst_fdsink_signals[LAST_SIGNAL] = { 0 };*/
@ -115,8 +115,9 @@ gst_fdsink_init (GstFdSink *fdsink)
} }
static void static void
gst_fdsink_chain (GstPad *pad, GstBuffer *buf) gst_fdsink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstFdSink *fdsink; GstFdSink *fdsink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -71,7 +71,7 @@ static void gst_fdsrc_set_property (GObject *object, guint prop_id,
static void gst_fdsrc_get_property (GObject *object, guint prop_id, static void gst_fdsrc_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static GstBuffer * gst_fdsrc_get (GstPad *pad); static GstData * gst_fdsrc_get (GstPad *pad);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -177,7 +177,7 @@ gst_fdsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
} }
} }
static GstBuffer * static GstData *
gst_fdsrc_get(GstPad *pad) gst_fdsrc_get(GstPad *pad)
{ {
GstFdSrc *src; GstFdSrc *src;
@ -195,14 +195,14 @@ gst_fdsrc_get(GstPad *pad)
/* if nothing was read, we're in eos */ /* if nothing was read, we're in eos */
if (readbytes == 0) { if (readbytes == 0) {
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER (gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
if (readbytes == -1) { if (readbytes == -1) {
g_error ("Error reading from file descriptor. Ending stream.\n"); g_error ("Error reading from file descriptor. Ending stream.\n");
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER (gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_SIZE (buf) = readbytes; GST_BUFFER_SIZE (buf) = readbytes;
@ -210,5 +210,5 @@ gst_fdsrc_get(GstPad *pad)
src->curoffset += readbytes; src->curoffset += readbytes;
/* we're done, return the buffer */ /* we're done, return the buffer */
return buf; return GST_DATA (buf);
} }

View file

@ -83,7 +83,7 @@ static void gst_filesink_close_file (GstFileSink *sink);
static gboolean gst_filesink_handle_event (GstPad *pad, GstEvent *event); static gboolean gst_filesink_handle_event (GstPad *pad, GstEvent *event);
static gboolean gst_filesink_pad_query (GstPad *pad, GstQueryType type, static gboolean gst_filesink_pad_query (GstPad *pad, GstQueryType type,
GstFormat *format, gint64 *value); GstFormat *format, gint64 *value);
static void gst_filesink_chain (GstPad *pad,GstBuffer *buf); static void gst_filesink_chain (GstPad *pad,GstData *_data);
static GstElementStateReturn gst_filesink_change_state (GstElement *element); static GstElementStateReturn gst_filesink_change_state (GstElement *element);
@ -361,8 +361,9 @@ gst_filesink_handle_event (GstPad *pad, GstEvent *event)
* take the buffer from the pad and write to file if it's open * take the buffer from the pad and write to file if it's open
*/ */
static void static void
gst_filesink_chain (GstPad *pad, GstBuffer *buf) gst_filesink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstFileSink *filesink; GstFileSink *filesink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -142,7 +142,7 @@ static void gst_filesrc_set_property (GObject *object, guint prop_id,
static void gst_filesrc_get_property (GObject *object, guint prop_id, static void gst_filesrc_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static GstBuffer * gst_filesrc_get (GstPad *pad); static GstData * gst_filesrc_get (GstPad *pad);
static gboolean gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event); static gboolean gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event);
static gboolean gst_filesrc_srcpad_query (GstPad *pad, GstQueryType type, static gboolean gst_filesrc_srcpad_query (GstPad *pad, GstQueryType type,
GstFormat *format, gint64 *value); GstFormat *format, gint64 *value);
@ -650,7 +650,7 @@ gst_filesrc_get_read (GstFileSrc *src)
return buf; return buf;
} }
static GstBuffer * static GstData *
gst_filesrc_get (GstPad *pad) gst_filesrc_get (GstPad *pad)
{ {
GstFileSrc *src; GstFileSrc *src;
@ -667,13 +667,13 @@ gst_filesrc_get (GstPad *pad)
GST_DEBUG ("filesrc sending discont"); GST_DEBUG ("filesrc sending discont");
event = gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset, NULL); event = gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset, NULL);
src->need_flush = FALSE; src->need_flush = FALSE;
return GST_BUFFER (event); return GST_DATA (event);
} }
/* check for flush */ /* check for flush */
if (src->need_flush) { if (src->need_flush) {
src->need_flush = FALSE; src->need_flush = FALSE;
GST_DEBUG ("filesrc sending flush"); GST_DEBUG ("filesrc sending flush");
return GST_BUFFER (gst_event_new_flush ()); return GST_DATA (gst_event_new_flush ());
} }
/* check for EOF */ /* check for EOF */
@ -681,13 +681,13 @@ gst_filesrc_get (GstPad *pad)
GST_DEBUG ("filesrc eos %" G_GINT64_FORMAT" %" G_GINT64_FORMAT, GST_DEBUG ("filesrc eos %" G_GINT64_FORMAT" %" G_GINT64_FORMAT,
src->curoffset, src->filelen); src->curoffset, src->filelen);
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER (gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
if (src->using_mmap){ if (src->using_mmap){
return gst_filesrc_get_mmap (src); return GST_DATA (gst_filesrc_get_mmap (src));
}else{ }else{
return gst_filesrc_get_read (src); return GST_DATA (gst_filesrc_get_read (src));
} }
} }

View file

@ -70,7 +70,7 @@ static void gst_identity_init (GstIdentity *identity);
static void gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_identity_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_identity_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static void gst_identity_chain (GstPad *pad, GstBuffer *buf); static void gst_identity_chain (GstPad *pad, GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
static guint gst_identity_signals[LAST_SIGNAL] = { 0 }; static guint gst_identity_signals[LAST_SIGNAL] = { 0 };
@ -224,8 +224,9 @@ gst_identity_init (GstIdentity *identity)
} }
static void static void
gst_identity_chain (GstPad *pad, GstBuffer *buf) gst_identity_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstIdentity *identity; GstIdentity *identity;
guint i; guint i;
@ -286,7 +287,7 @@ gst_identity_chain (GstPad *pad, GstBuffer *buf)
if (i>1) if (i>1)
gst_buffer_ref (buf); gst_buffer_ref (buf);
gst_pad_push (identity->srcpad, buf); gst_pad_push (identity->srcpad, GST_DATA (buf));
if (identity->sleep_time) if (identity->sleep_time)
g_usleep (identity->sleep_time); g_usleep (identity->sleep_time);
@ -304,7 +305,7 @@ gst_identity_loop (GstElement *element)
identity = GST_IDENTITY (element); identity = GST_IDENTITY (element);
buf = gst_pad_pull (identity->sinkpad); buf = GST_BUFFER (gst_pad_pull (identity->sinkpad));
if (GST_IS_EVENT (buf)) { if (GST_IS_EVENT (buf)) {
GstEvent *event = GST_EVENT (buf); GstEvent *event = GST_EVENT (buf);
@ -316,7 +317,7 @@ gst_identity_loop (GstElement *element)
} }
} }
else { else {
gst_identity_chain (identity->sinkpad, buf); gst_identity_chain (identity->sinkpad, GST_DATA (buf));
} }
} }

View file

@ -63,7 +63,7 @@ static void gst_md5sink_init (GstMD5Sink *md5sink);
static void gst_md5sink_get_property (GObject *object, guint prop_id, static void gst_md5sink_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_md5sink_chain (GstPad *pad, GstBuffer *buf); static void gst_md5sink_chain (GstPad *pad, GstData *_data);
static GstElementStateReturn gst_md5sink_change_state (GstElement *element); static GstElementStateReturn gst_md5sink_change_state (GstElement *element);
/* variables */ /* variables */
@ -489,8 +489,9 @@ gst_md5sink_get_property (GObject *object, guint prop_id, GValue *value, GParamS
} }
static void static void
gst_md5sink_chain (GstPad *pad, GstBuffer *buf) gst_md5sink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstMD5Sink *md5sink; GstMD5Sink *md5sink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -63,7 +63,7 @@ static void gst_multidisksrc_init (GstMultiDiskSrc *disksrc);
static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static GstBuffer * gst_multidisksrc_get (GstPad *pad); static GstData * gst_multidisksrc_get (GstPad *pad);
/*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/ /*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/
static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element); static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element);
@ -195,7 +195,7 @@ gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GP
* *
* Push a new buffer from the disksrc at the current offset. * Push a new buffer from the disksrc at the current offset.
*/ */
static GstBuffer * static GstData *
gst_multidisksrc_get (GstPad *pad) gst_multidisksrc_get (GstPad *pad)
{ {
GstMultiDiskSrc *src; GstMultiDiskSrc *src;
@ -209,7 +209,7 @@ gst_multidisksrc_get (GstPad *pad)
gst_multidisksrc_close_file(src); gst_multidisksrc_close_file(src);
if (!src->listptr) { if (!src->listptr) {
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
list = src->listptr; list = src->listptr;
@ -240,7 +240,7 @@ gst_multidisksrc_get (GstPad *pad)
} }
/* we're done, return the buffer */ /* we're done, return the buffer */
return buf; return GST_DATA (buf);
} }
/* open the file and mmap it, necessary to go to READY state */ /* open the file and mmap it, necessary to go to READY state */

View file

@ -63,7 +63,7 @@ static void gst_multidisksrc_init (GstMultiDiskSrc *disksrc);
static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static GstBuffer * gst_multidisksrc_get (GstPad *pad); static GstData * gst_multidisksrc_get (GstPad *pad);
/*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/ /*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/
static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element); static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element);
@ -195,7 +195,7 @@ gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GP
* *
* Push a new buffer from the disksrc at the current offset. * Push a new buffer from the disksrc at the current offset.
*/ */
static GstBuffer * static GstData *
gst_multidisksrc_get (GstPad *pad) gst_multidisksrc_get (GstPad *pad)
{ {
GstMultiDiskSrc *src; GstMultiDiskSrc *src;
@ -209,7 +209,7 @@ gst_multidisksrc_get (GstPad *pad)
gst_multidisksrc_close_file(src); gst_multidisksrc_close_file(src);
if (!src->listptr) { if (!src->listptr) {
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
list = src->listptr; list = src->listptr;
@ -240,7 +240,7 @@ gst_multidisksrc_get (GstPad *pad)
} }
/* we're done, return the buffer */ /* we're done, return the buffer */
return buf; return GST_DATA (buf);
} }
/* open the file and mmap it, necessary to go to READY state */ /* open the file and mmap it, necessary to go to READY state */

View file

@ -68,8 +68,8 @@ static void gst_pipefilter_init (GstPipefilter *pipefilter);
static void gst_pipefilter_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_pipefilter_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_pipefilter_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_pipefilter_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static GstBuffer* gst_pipefilter_get (GstPad *pad); static GstData* gst_pipefilter_get (GstPad *pad);
static void gst_pipefilter_chain (GstPad *pad, GstBuffer *buf); static void gst_pipefilter_chain (GstPad *pad, GstData *_data);
static gboolean gst_pipefilter_handle_event (GstPad *pad, GstEvent *event); static gboolean gst_pipefilter_handle_event (GstPad *pad, GstEvent *event);
static GstElementStateReturn gst_pipefilter_change_state (GstElement *element); static GstElementStateReturn gst_pipefilter_change_state (GstElement *element);
@ -155,7 +155,7 @@ gst_pipefilter_handle_event (GstPad *pad, GstEvent *event)
return TRUE; return TRUE;
} }
static GstBuffer* static GstData*
gst_pipefilter_get (GstPad *pad) gst_pipefilter_get (GstPad *pad)
{ {
GstPipefilter *pipefilter; GstPipefilter *pipefilter;
@ -184,7 +184,7 @@ gst_pipefilter_get (GstPad *pad)
} }
/* if we didn't get as many bytes as we asked for, we're at EOF */ /* if we didn't get as many bytes as we asked for, we're at EOF */
if (readbytes == 0) { if (readbytes == 0) {
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
@ -192,12 +192,13 @@ gst_pipefilter_get (GstPad *pad)
GST_BUFFER_SIZE(newbuf) = readbytes; GST_BUFFER_SIZE(newbuf) = readbytes;
pipefilter->curoffset += readbytes; pipefilter->curoffset += readbytes;
return newbuf; return GST_DATA (newbuf);
} }
static void static void
gst_pipefilter_chain (GstPad *pad,GstBuffer *buf) gst_pipefilter_chain (GstPad *pad,GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstPipefilter *pipefilter; GstPipefilter *pipefilter;
glong writebytes; glong writebytes;
guchar *data; guchar *data;

View file

@ -286,14 +286,14 @@ gst_shaper_loop (GstElement *element)
if (connection->buffer == NULL && GST_PAD_IS_USABLE (connection->sinkpad)) { if (connection->buffer == NULL && GST_PAD_IS_USABLE (connection->sinkpad)) {
GstBuffer *buffer; GstBuffer *buffer;
buffer = gst_pad_pull (connection->sinkpad); buffer = GST_BUFFER (gst_pad_pull (connection->sinkpad));
/* events are simply pushed ASAP */ /* events are simply pushed ASAP */
if (GST_IS_EVENT (buffer)) { if (GST_IS_EVENT (buffer)) {
/* save event type as it will be unreffed after the next push */ /* save event type as it will be unreffed after the next push */
GstEventType type = GST_EVENT_TYPE (buffer); GstEventType type = GST_EVENT_TYPE (buffer);
gst_pad_push (connection->srcpad, buffer); gst_pad_push (connection->srcpad, GST_DATA (buffer));
switch (type) { switch (type) {
/* on EOS we disable the pad so that we don't pull on /* on EOS we disable the pad so that we don't pull on
@ -322,7 +322,7 @@ gst_shaper_loop (GstElement *element)
} }
/* if we have a connection with a buffer, push it */ /* if we have a connection with a buffer, push it */
if (min != NULL && min->buffer) { if (min != NULL && min->buffer) {
gst_pad_push (min->srcpad, min->buffer); gst_pad_push (min->srcpad, GST_DATA (min->buffer));
min->buffer = NULL; min->buffer = NULL;
/* since we pushed a buffer, it's not EOS */ /* since we pushed a buffer, it's not EOS */
eos = FALSE; eos = FALSE;

View file

@ -67,7 +67,7 @@ static void gst_statistics_init (GstStatistics *statistics);
static void gst_statistics_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_statistics_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_statistics_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_statistics_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static void gst_statistics_chain (GstPad *pad, GstBuffer *buf); static void gst_statistics_chain (GstPad *pad, GstData *_data);
static void gst_statistics_reset (GstStatistics *statistics); static void gst_statistics_reset (GstStatistics *statistics);
static void gst_statistics_print (GstStatistics *statistics); static void gst_statistics_print (GstStatistics *statistics);
@ -256,8 +256,9 @@ gst_statistics_print (GstStatistics *statistics)
} }
static void static void
gst_statistics_chain (GstPad *pad, GstBuffer *buf) gst_statistics_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstStatistics *statistics; GstStatistics *statistics;
gboolean update = FALSE; gboolean update = FALSE;
@ -313,7 +314,7 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
gst_statistics_print(statistics); gst_statistics_print(statistics);
} }
} }
gst_pad_push (statistics->srcpad, buf); gst_pad_push (statistics->srcpad, GST_DATA (buf));
} }
static void static void

View file

@ -71,7 +71,7 @@ static void gst_tee_set_property (GObject *object, guint prop_id,
static void gst_tee_get_property (GObject *object, guint prop_id, static void gst_tee_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_tee_chain (GstPad *pad, GstBuffer *buf); static void gst_tee_chain (GstPad *pad, GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -342,8 +342,9 @@ gst_tee_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec
* Chain a buffer on a pad. * Chain a buffer on a pad.
*/ */
static void static void
gst_tee_chain (GstPad *pad, GstBuffer *buf) gst_tee_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstTee *tee; GstTee *tee;
const GList *pads; const GList *pads;
@ -373,7 +374,7 @@ gst_tee_chain (GstPad *pad, GstBuffer *buf)
} }
if (GST_PAD_IS_USABLE (outpad)) if (GST_PAD_IS_USABLE (outpad))
gst_pad_push (outpad, buf); gst_pad_push (outpad, GST_DATA (buf));
else else
gst_buffer_unref (buf); gst_buffer_unref (buf);
} }

View file

@ -150,7 +150,7 @@ gst_bytestream_get_next_buf (GstByteStream *bs)
return FALSE; return FALSE;
GST_DEBUG ("get_next_buf: pulling buffer"); GST_DEBUG ("get_next_buf: pulling buffer");
nextbuf = gst_pad_pull (bs->pad); nextbuf = GST_BUFFER (gst_pad_pull (bs->pad));
if (!nextbuf) if (!nextbuf)
return FALSE; return FALSE;

View file

@ -2302,15 +2302,14 @@ gst_ghost_pad_save_thyself (GstPad *pad, xmlNodePtr parent)
/** /**
* gst_pad_push: * gst_pad_push:
* @pad: a #GstPad to push the buffer out of. * @pad: a #GstPad to push the buffer out of.
* @buf: the #GstBuffer to push. * @data: the #GstData to push.
* *
* Pushes a buffer to the peer of the pad. * Pushes a buffer or an event to the peer of the pad.
*/ */
void void
gst_pad_push (GstPad *pad, GstBuffer *buf) gst_pad_push (GstPad *pad, GstData *data)
{ {
GstRealPad *peer; GstRealPad *peer;
GstData *data = GST_DATA(buf);
g_assert (GST_IS_PAD (pad)); g_assert (GST_IS_PAD (pad));
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, pad, "pushing"); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, pad, "pushing");
@ -2342,7 +2341,7 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
if (!gst_probe_dispatcher_dispatch (&peer->probedisp, &data)) if (!gst_probe_dispatcher_dispatch (&peer->probedisp, &data))
return; return;
(peer->chainhandler) (GST_PAD_CAST (peer), (GstBuffer *)data); (peer->chainhandler) (GST_PAD_CAST (peer), data);
return; return;
} }
else { else {
@ -2364,11 +2363,11 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
* gst_pad_pull: * gst_pad_pull:
* @pad: a #GstPad to pull a buffer from. * @pad: a #GstPad to pull a buffer from.
* *
* Pulls a buffer from the peer pad. * Pulls an event or a buffer from the peer pad.
* *
* Returns: a new #GstBuffer from the peer pad. * Returns: a new #GstData from the peer pad.
*/ */
GstBuffer* GstData*
gst_pad_pull (GstPad *pad) gst_pad_pull (GstPad *pad)
{ {
GstRealPad *peer; GstRealPad *peer;
@ -2376,7 +2375,7 @@ gst_pad_pull (GstPad *pad)
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, pad, "pulling"); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, pad, "pulling");
g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK,
GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT))); GST_DATA (gst_event_new (GST_EVENT_INTERRUPT)));
peer = GST_RPAD_PEER (pad); peer = GST_RPAD_PEER (pad);
@ -2396,12 +2395,12 @@ restart:
GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_FUNCPTR_NAME (peer->gethandler),
GST_DEBUG_PAD_NAME (peer)); GST_DEBUG_PAD_NAME (peer));
data = GST_DATA((peer->gethandler) (GST_PAD_CAST (peer))); data = (peer->gethandler) (GST_PAD_CAST (peer));
if (data) { if (data) {
if (!gst_probe_dispatcher_dispatch (&peer->probedisp, &data)) if (!gst_probe_dispatcher_dispatch (&peer->probedisp, &data))
goto restart; goto restart;
return GST_BUFFER(data); return data;
} }
/* no null buffers allowed */ /* no null buffers allowed */
@ -2416,7 +2415,7 @@ restart:
GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)); GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer));
} }
} }
return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)); return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
} }
/** /**
@ -2891,7 +2890,7 @@ gst_pad_event_default_dispatch (GstPad *pad, GstElement *element,
if (GST_PAD_DIRECTION (eventpad) == GST_PAD_SRC) { if (GST_PAD_DIRECTION (eventpad) == GST_PAD_SRC) {
/* increase the refcount */ /* increase the refcount */
gst_event_ref (event); gst_event_ref (event);
gst_pad_push (eventpad, GST_BUFFER (event)); gst_pad_push (eventpad, GST_DATA (event));
} }
else { else {
GstPad *peerpad = GST_PAD_CAST (GST_RPAD_PEER (eventpad)); GstPad *peerpad = GST_PAD_CAST (GST_RPAD_PEER (eventpad));

View file

@ -133,8 +133,8 @@ typedef enum {
/* this defines the functions used to chain buffers /* this defines the functions used to chain buffers
* pad is the sink pad (so the same chain function can be used for N pads) * pad is the sink pad (so the same chain function can be used for N pads)
* buf is the buffer being passed */ * buf is the buffer being passed */
typedef void (*GstPadChainFunction) (GstPad *pad,GstBuffer *buf); typedef void (*GstPadChainFunction) (GstPad *pad,GstData *data);
typedef GstBuffer* (*GstPadGetFunction) (GstPad *pad); typedef GstData* (*GstPadGetFunction) (GstPad *pad);
typedef gboolean (*GstPadEventFunction) (GstPad *pad, GstEvent *event); typedef gboolean (*GstPadEventFunction) (GstPad *pad, GstEvent *event);
typedef gboolean (*GstPadConvertFunction) (GstPad *pad, typedef gboolean (*GstPadConvertFunction) (GstPad *pad,
GstFormat src_format, gint64 src_value, GstFormat src_format, gint64 src_value,
@ -470,8 +470,8 @@ gboolean gst_pad_recalc_allowed_caps (GstPad *pad);
gboolean gst_pad_recover_caps_error (GstPad *pad, GstCaps *allowed); gboolean gst_pad_recover_caps_error (GstPad *pad, GstCaps *allowed);
/* data passing functions */ /* data passing functions */
void gst_pad_push (GstPad *pad, GstBuffer *buf); void gst_pad_push (GstPad *pad, GstData *data);
GstBuffer* gst_pad_pull (GstPad *pad); GstData* gst_pad_pull (GstPad *pad);
gboolean gst_pad_send_event (GstPad *pad, GstEvent *event); gboolean gst_pad_send_event (GstPad *pad, GstEvent *event);
gboolean gst_pad_event_default (GstPad *pad, GstEvent *event); gboolean gst_pad_event_default (GstPad *pad, GstEvent *event);
GstPad* gst_pad_select (GList *padlist); GstPad* gst_pad_select (GList *padlist);

View file

@ -72,8 +72,8 @@ static void gst_queue_set_property (GObject *object, guint prop_id,
static void gst_queue_get_property (GObject *object, guint prop_id, static void gst_queue_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static void gst_queue_chain (GstPad *pad, GstData *data);
static GstBuffer * gst_queue_get (GstPad *pad); static GstData * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event); static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event);
@ -300,13 +300,13 @@ gst_queue_locked_flush (GstQueue *queue)
} }
static void static void
gst_queue_chain (GstPad *pad, GstBuffer *buf) gst_queue_chain (GstPad *pad, GstData *data)
{ {
GstQueue *queue; GstQueue *queue;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad)); g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL); g_return_if_fail (data != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
@ -319,33 +319,35 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n");
} }
g_async_queue_unlock(queue->events); g_async_queue_unlock(queue->events);
restart: restart:
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock); g_mutex_lock (queue->qlock);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
/* assume don't need to flush this buffer when the queue is filled */ /* assume don't need to flush this buffer when the queue is filled */
queue->flush = FALSE; queue->flush = FALSE;
if (GST_IS_EVENT (buf)) { if (GST_IS_EVENT (data)) {
switch (GST_EVENT_TYPE (buf)) { switch (GST_EVENT_TYPE (data)) {
case GST_EVENT_FLUSH: case GST_EVENT_FLUSH:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue);
break; break;
case GST_EVENT_EOS: case GST_EVENT_EOS:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
GST_ELEMENT_NAME (queue), queue->level_buffers); GST_ELEMENT_NAME (queue), queue->level_buffers);
break; break;
default: default:
/* we put the event in the queue, we don't have to act ourselves */ /* we put the event in the queue, we don't have to act ourselves */
break; break;
} }
} }
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d",buf,GST_BUFFER_SIZE(buf)); if (GST_IS_BUFFER (data))
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
"adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
if (queue->level_buffers == queue->size_buffers) { if (queue->level_buffers == queue->size_buffers) {
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
@ -358,10 +360,10 @@ restart:
/* if we leak on the upstream side, drop the current buffer */ /* if we leak on the upstream side, drop the current buffer */
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
if (GST_IS_EVENT (buf)) if (GST_IS_EVENT (data))
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)), GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(buf))); GST_EVENT_TYPE(GST_EVENT(data)));
/* now we have to clean up and exit right away */ /* now we have to clean up and exit right away */
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
goto out_unref; goto out_unref;
@ -369,21 +371,23 @@ restart:
/* otherwise we have to push a buffer off the other end */ /* otherwise we have to push a buffer off the other end */
else { else {
gpointer front; gpointer front;
GstBuffer *leakbuf; GstData *leak;
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
front = g_queue_pop_head (queue->queue); front = g_queue_pop_head (queue->queue);
leakbuf = (GstBuffer *)(front); leak = GST_DATA (front);
if (GST_IS_EVENT (leakbuf)) { queue->level_buffers--;
if (GST_IS_EVENT (leak)) {
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)), GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(leakbuf))); GST_EVENT_TYPE(GST_EVENT(leak)));
} } else {
queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(leak);
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); }
gst_data_unref (GST_DATA (leakbuf));
gst_data_unref (leak);
} }
} }
@ -412,7 +416,7 @@ restart:
/* try to signal to resolve the error */ /* try to signal to resolve the error */
if (!queue->may_deadlock) { if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
gst_data_unref (GST_DATA (buf)); gst_data_unref (data);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
/* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */ /* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */
return; return;
@ -428,14 +432,15 @@ restart:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal");
} }
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes", GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes); queue->level_buffers, queue->size_buffers, queue->level_bytes);
} }
/* put the buffer on the tail of the list */ /* put the buffer on the tail of the list */
g_queue_push_tail (queue->queue, buf); g_queue_push_tail (queue->queue, data);
queue->level_buffers++; queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf); if (GST_IS_BUFFER (data))
queue->level_bytes += GST_BUFFER_SIZE (data);
/* this assertion _has_ to hold */ /* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers); g_assert (queue->queue->length == queue->level_buffers);
@ -451,15 +456,15 @@ restart:
return; return;
out_unref: out_unref:
gst_data_unref (GST_DATA (buf)); gst_data_unref (data);
return; return;
} }
static GstBuffer * static GstData *
gst_queue_get (GstPad *pad) gst_queue_get (GstPad *pad)
{ {
GstQueue *queue; GstQueue *queue;
GstBuffer *buf = NULL; GstData *data = NULL;
gpointer front; gpointer front;
g_assert(pad != NULL); g_assert(pad != NULL);
@ -486,7 +491,7 @@ restart:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!");
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue))) if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)); return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
goto restart; goto restart;
} }
if (GST_STATE (queue) != GST_STATE_PLAYING) { if (GST_STATE (queue) != GST_STATE_PLAYING) {
@ -512,7 +517,7 @@ restart:
if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){ if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
g_warning ("filler"); g_warning ("filler");
return GST_BUFFER(gst_event_new_filler()); return GST_DATA (gst_event_new_filler());
} }
} }
else { else {
@ -524,11 +529,12 @@ restart:
queue->level_buffers, queue->size_buffers, queue->level_bytes); queue->level_buffers, queue->size_buffers, queue->level_bytes);
front = g_queue_pop_head (queue->queue); front = g_queue_pop_head (queue->queue);
buf = (GstBuffer *)(front); data = GST_DATA (front);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue", buf); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved data %p from queue", data);
queue->level_buffers--; queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf); if (GST_IS_BUFFER (data))
queue->level_bytes -= GST_BUFFER_SIZE (data);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes", GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes",
GST_DEBUG_PAD_NAME(pad), GST_DEBUG_PAD_NAME(pad),
@ -543,8 +549,8 @@ restart:
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
/* FIXME where should this be? locked? */ /* FIXME where should this be? locked? */
if (GST_IS_EVENT(buf)) { if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT(buf); GstEvent *event = GST_EVENT (data);
switch (GST_EVENT_TYPE(event)) { switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS: case GST_EVENT_EOS:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue)); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
@ -555,7 +561,7 @@ restart:
} }
} }
return buf; return data;
} }

View file

@ -35,7 +35,7 @@ GST_DEBUG_CATEGORY_STATIC(debug_scheduler);
typedef struct _GstSchedulerChain GstSchedulerChain; typedef struct _GstSchedulerChain GstSchedulerChain;
#define GST_ELEMENT_THREADSTATE(elem) (cothread*) (GST_ELEMENT_CAST (elem)->sched_private) #define GST_ELEMENT_THREADSTATE(elem) (cothread*) (GST_ELEMENT_CAST (elem)->sched_private)
#define GST_RPAD_BUFPEN(pad) (GstBuffer*) (GST_REAL_PAD_CAST(pad)->sched_private) #define GST_RPAD_BUFPEN(pad) (GstData*) (GST_REAL_PAD_CAST(pad)->sched_private)
#define GST_ELEMENT_COTHREAD_STOPPING GST_ELEMENT_SCHEDULER_PRIVATE1 #define GST_ELEMENT_COTHREAD_STOPPING GST_ELEMENT_SCHEDULER_PRIVATE1
#define GST_ELEMENT_IS_COTHREAD_STOPPING(element) GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING) #define GST_ELEMENT_IS_COTHREAD_STOPPING(element) GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
@ -333,19 +333,19 @@ gst_basic_scheduler_chain_wrapper (int argc, char **argv)
if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK && if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK &&
GST_PAD_IS_LINKED (realpad)) { GST_PAD_IS_LINKED (realpad)) {
GstBuffer *buf; GstData *data;
GST_CAT_DEBUG (debug_dataflow, "pulling data from %s:%s", name, GST_CAT_DEBUG (debug_dataflow, "pulling data from %s:%s", name,
GST_PAD_NAME (pad)); GST_PAD_NAME (pad));
buf = gst_pad_pull (pad); data = gst_pad_pull (pad);
if (buf) { if (data) {
if (GST_IS_EVENT (buf) && !GST_ELEMENT_IS_EVENT_AWARE (element)) { if (GST_IS_EVENT (data) && !GST_ELEMENT_IS_EVENT_AWARE (element)) {
gst_pad_send_event (pad, GST_EVENT (buf)); gst_pad_send_event (pad, GST_EVENT (data));
} }
else { else {
GST_CAT_DEBUG (debug_dataflow, "calling chain function of %s:%s %p", GST_CAT_DEBUG (debug_dataflow, "calling chain function of %s:%s %p",
name, GST_PAD_NAME (pad), buf); name, GST_PAD_NAME (pad), data);
GST_RPAD_CHAINFUNC (realpad) (pad, buf); GST_RPAD_CHAINFUNC (realpad) (pad, data);
GST_CAT_DEBUG (debug_dataflow, GST_CAT_DEBUG (debug_dataflow,
"calling chain function of element %s done", name); "calling chain function of element %s done", name);
} }
@ -377,7 +377,7 @@ gst_basic_scheduler_src_wrapper (int argc, char **argv)
GstElement *element = GST_ELEMENT_CAST (argv); GstElement *element = GST_ELEMENT_CAST (argv);
GList *pads; GList *pads;
GstRealPad *realpad; GstRealPad *realpad;
GstBuffer *buf = NULL; GstData *data = NULL;
G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element); G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
GST_DEBUG ("entering src wrapper of element %s", name); GST_DEBUG ("entering src wrapper of element %s", name);
@ -395,11 +395,11 @@ gst_basic_scheduler_src_wrapper (int argc, char **argv)
if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC && GST_PAD_IS_USABLE (realpad)) { if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC && GST_PAD_IS_USABLE (realpad)) {
GST_CAT_DEBUG (debug_dataflow, "calling _getfunc for %s:%s", GST_DEBUG_PAD_NAME (realpad)); GST_CAT_DEBUG (debug_dataflow, "calling _getfunc for %s:%s", GST_DEBUG_PAD_NAME (realpad));
g_return_val_if_fail (GST_RPAD_GETFUNC (realpad) != NULL, 0); g_return_val_if_fail (GST_RPAD_GETFUNC (realpad) != NULL, 0);
buf = GST_RPAD_GETFUNC (realpad) (GST_PAD_CAST (realpad)); data = GST_RPAD_GETFUNC (realpad) (GST_PAD_CAST (realpad));
if (buf) { if (data) {
GST_CAT_DEBUG (debug_dataflow, "calling gst_pad_push on pad %s:%s %p", GST_CAT_DEBUG (debug_dataflow, "calling gst_pad_push on pad %s:%s %p",
GST_DEBUG_PAD_NAME (realpad), buf); GST_DEBUG_PAD_NAME (realpad), data);
gst_pad_push (GST_PAD_CAST (realpad), buf); gst_pad_push (GST_PAD_CAST (realpad), data);
} }
} }
} }
@ -419,7 +419,7 @@ gst_basic_scheduler_src_wrapper (int argc, char **argv)
} }
static void static void
gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstData * data)
{ {
gint loop_count = 100; gint loop_count = 100;
GstElement *parent; GstElement *parent;
@ -429,7 +429,7 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
peer = GST_RPAD_PEER (pad); peer = GST_RPAD_PEER (pad);
GST_DEBUG ("entered chainhandler proxy of %s:%s", GST_DEBUG_PAD_NAME (pad)); GST_DEBUG ("entered chainhandler proxy of %s:%s", GST_DEBUG_PAD_NAME (pad));
GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer \"%s:%s\"'s pen", buf, GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer \"%s:%s\"'s pen", data,
GST_DEBUG_PAD_NAME (peer)); GST_DEBUG_PAD_NAME (peer));
/* /*
@ -459,9 +459,9 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL); g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
/* now fill the bufferpen and switch so it can be consumed */ /* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf; GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = data;
GST_CAT_DEBUG (debug_dataflow, "switching to %p to consume buffer %p", GST_CAT_DEBUG (debug_dataflow, "switching to %p to consume buffer %p",
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)), buf); GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)), data);
do_element_switch (parent); do_element_switch (parent);
@ -469,18 +469,18 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
} }
static void static void
gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf) gst_basic_scheduler_select_proxy (GstPad * pad, GstData * data)
{ {
GstElement *parent; GstElement *parent;
parent = GST_PAD_PARENT (pad); parent = GST_PAD_PARENT (pad);
GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer's pen of pad %s:%s", GST_CAT_DEBUG (debug_dataflow, "putting buffer %p in peer's pen of pad %s:%s",
buf, GST_DEBUG_PAD_NAME (pad)); data, GST_DEBUG_PAD_NAME (pad));
g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL); g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
/* now fill the bufferpen and switch so it can be consumed */ /* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf; GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = data;
GST_CAT_DEBUG (debug_dataflow, "switching to %p", GST_CAT_DEBUG (debug_dataflow, "switching to %p",
GST_ELEMENT_THREADSTATE (parent)); GST_ELEMENT_THREADSTATE (parent));
/* FIXME temporarily diabled */ /* FIXME temporarily diabled */
@ -492,10 +492,10 @@ gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf)
} }
static GstBuffer * static GstData *
gst_basic_scheduler_gethandler_proxy (GstPad * pad) gst_basic_scheduler_gethandler_proxy (GstPad * pad)
{ {
GstBuffer *buf; GstData *data;
GstElement *parent; GstElement *parent;
GstRealPad *peer; GstRealPad *peer;
@ -528,12 +528,12 @@ gst_basic_scheduler_gethandler_proxy (GstPad * pad)
GST_CAT_DEBUG (debug_dataflow, "done switching"); GST_CAT_DEBUG (debug_dataflow, "done switching");
/* now grab the buffer from the pen, clear the pen, and return the buffer */ /* now grab the buffer from the pen, clear the pen, and return the buffer */
buf = GST_RPAD_BUFPEN (pad); data = GST_RPAD_BUFPEN (pad);
GST_RPAD_BUFPEN (pad) = NULL; GST_RPAD_BUFPEN (pad) = NULL;
GST_DEBUG ("leaving gethandler proxy of %s:%s", GST_DEBUG_PAD_NAME (pad)); GST_DEBUG ("leaving gethandler proxy of %s:%s", GST_DEBUG_PAD_NAME (pad));
return buf; return data;
} }
static gboolean static gboolean
@ -559,7 +559,7 @@ gst_basic_scheduler_eventhandler_proxy (GstPad *srcpad, GstEvent *event)
} }
if (flush) { if (flush) {
GstData *data = GST_DATA (GST_RPAD_BUFPEN (srcpad)); GstData *data = GST_RPAD_BUFPEN (srcpad);
GST_INFO ("event is flush"); GST_INFO ("event is flush");

View file

@ -117,6 +117,7 @@ struct _GstOptSchedulerChain {
gint num_groups; gint num_groups;
gint num_enabled; gint num_enabled;
}; };
/* /*
* elements that are scheduled in one cothread * elements that are scheduled in one cothread
*/ */
@ -187,7 +188,9 @@ struct _GstOptSchedulerGroup {
/* some group operations */ /* some group operations */
static GstOptSchedulerGroup* ref_group (GstOptSchedulerGroup *group); static GstOptSchedulerGroup* ref_group (GstOptSchedulerGroup *group);
#ifndef USE_COTHREADS #ifndef USE_COTHREADS
/*
static GstOptSchedulerGroup* ref_group_by_count (GstOptSchedulerGroup *group, gint count); static GstOptSchedulerGroup* ref_group_by_count (GstOptSchedulerGroup *group, gint count);
*/
#endif #endif
static GstOptSchedulerGroup* unref_group (GstOptSchedulerGroup *group); static GstOptSchedulerGroup* unref_group (GstOptSchedulerGroup *group);
static void destroy_group (GstOptSchedulerGroup *group); static void destroy_group (GstOptSchedulerGroup *group);
@ -253,67 +256,6 @@ static void gst_opt_scheduler_show (GstScheduler *sched);
static GstSchedulerClass *parent_class = NULL; static GstSchedulerClass *parent_class = NULL;
/* debug functions */
static void
gst_opt_scheduler_group_debug (GstOptSchedulerGroup *group)
{
GSList *el;
GstElement *e;
gchar *string;
gchar *name;
gchar *s;
el = group->elements;
if (!el) return;
e = GST_ELEMENT (el->data);
string = g_strdup (GST_ELEMENT_NAME (e));
el = el->next;
while (el)
{
e = GST_ELEMENT (el->data);
s = string;
name = g_strdup_printf (",%s", GST_ELEMENT_NAME (e));
string = g_strconcat (s, name, NULL);
g_free (s);
g_free (name);
el = el->next;
}
GST_DEBUG ("scheduler group %p: %s", group, string);
g_free (string);
}
static void
gst_opt_scheduler_chain_debug (GstOptSchedulerChain *chain, const gchar *label)
{
GSList *group = NULL;
GST_DEBUG ("starting opt scheduler chain debug: %s", label);
GST_DEBUG ("refcount %d, num_groups %d, num_enabled %d",
chain->refcount, chain->num_groups, chain->num_enabled);
GST_DEBUG ("scheduler %p", chain->sched);
group = chain->groups;
while (group)
{
gst_opt_scheduler_group_debug ((GstOptSchedulerGroup *) group->data);
group = group->next;
}
GST_DEBUG ("finished caps debug");
}
#ifndef USE_COTHREADS
static void
gst_opt_scheduler_debug (GstOptScheduler *osched, const gchar *label)
{
GST_INFO ("%s:debugging scheduler run queue with recursion %d and length %d",
label, osched->recursion, g_list_length (osched->runqueue));
g_list_foreach (osched->runqueue, (GFunc) gst_opt_scheduler_group_debug, NULL);
}
#endif
static GType static GType
gst_opt_scheduler_get_type (void) gst_opt_scheduler_get_type (void)
{ {
@ -654,6 +596,7 @@ ref_group (GstOptSchedulerGroup *group)
} }
#ifndef USE_COTHREADS #ifndef USE_COTHREADS
/* remove me
static GstOptSchedulerGroup* static GstOptSchedulerGroup*
ref_group_by_count (GstOptSchedulerGroup *group, gint count) ref_group_by_count (GstOptSchedulerGroup *group, gint count)
{ {
@ -664,6 +607,7 @@ ref_group_by_count (GstOptSchedulerGroup *group, gint count)
return group; return group;
} }
*/
#endif #endif
static GstOptSchedulerGroup* static GstOptSchedulerGroup*
@ -866,7 +810,6 @@ group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gbo
static gboolean static gboolean
schedule_group (GstOptSchedulerGroup *group) schedule_group (GstOptSchedulerGroup *group)
{ {
g_assert (group != NULL);
if (!group->entry) { if (!group->entry) {
GST_INFO ( "not scheduling group %p without entry", group); GST_INFO ( "not scheduling group %p without entry", group);
return FALSE; return FALSE;
@ -911,8 +854,7 @@ schedule_group (GstOptSchedulerGroup *group)
static void static void
gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched) gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
{ {
GST_LOG_OBJECT (osched, GST_LOG_OBJECT (osched, "entering scheduler run queue recursion %d %d",
"entering scheduler run queue (recursion %d, length %d)",
osched->recursion, g_list_length (osched->runqueue)); osched->recursion, g_list_length (osched->runqueue));
/* make sure we don't exceed max_recursion */ /* make sure we don't exceed max_recursion */
@ -926,24 +868,12 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
while (osched->runqueue) { while (osched->runqueue) {
GstOptSchedulerGroup *group; GstOptSchedulerGroup *group;
gboolean res; gboolean res;
GList *p;
group = (GstOptSchedulerGroup *) osched->runqueue->data; group = (GstOptSchedulerGroup *) osched->runqueue->data;
/* Get the next group that isn't disabled */ /* runqueue hols refcount to group */
p = osched->runqueue;
while (GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
p = p->next;
if (p == NULL) {
return;
}
group = (GstOptSchedulerGroup *) p->data;
}
/* runqueue holds refcount to group */
gst_opt_scheduler_debug (osched, "scheduler runqueue loop");
osched->runqueue = g_list_remove (osched->runqueue, group); osched->runqueue = g_list_remove (osched->runqueue, group);
GST_LOG_OBJECT (osched, "scheduling group %p", group); GST_LOG_OBJECT (osched, "scheduling group %p", group);
res = schedule_group (group); res = schedule_group (group);
@ -957,9 +887,7 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
unref_group (group); unref_group (group);
} }
#if 0
GST_LOG_OBJECT (osched, "run queue length after scheduling %d", g_list_length (osched->runqueue)); GST_LOG_OBJECT (osched, "run queue length after scheduling %d", g_list_length (osched->runqueue));
#endif
osched->recursion--; osched->recursion--;
} }
@ -975,8 +903,6 @@ schedule_chain (GstOptSchedulerChain *chain)
osched = chain->sched; osched = chain->sched;
groups = chain->groups; groups = chain->groups;
GST_DEBUG ("scheduling chain %p with %d groups",
chain, g_slist_length (groups));
while (groups) { while (groups) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data; GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
@ -989,13 +915,12 @@ schedule_chain (GstOptSchedulerChain *chain)
schedule_group (group); schedule_group (group);
#else #else
osched->recursion = 0; osched->recursion = 0;
if (osched->runqueue == NULL || group != osched->runqueue->data) { if (!g_list_find (osched->runqueue, group))
ref_group (group); {
osched->runqueue = g_list_append (osched->runqueue, group); ref_group (group);
osched->runqueue = g_list_append (osched->runqueue, group);
} }
GST_LOG ("calling scheduler_run_queue on %p", osched);
gst_opt_scheduler_schedule_run_queue (osched); gst_opt_scheduler_schedule_run_queue (osched);
GST_LOG ("calling scheduler_run_queue on %p done", osched);
#endif #endif
GST_LOG ("done scheduling group %p in chain %p", GST_LOG ("done scheduling group %p in chain %p",
@ -1006,7 +931,6 @@ schedule_chain (GstOptSchedulerChain *chain)
groups = g_slist_next (groups); groups = g_slist_next (groups);
} }
GST_DEBUG ("done scheduling chain %p", chain);
} }
/* a get-based group is scheduled by getting a buffer from the get based /* a get-based group is scheduled by getting a buffer from the get based
@ -1025,7 +949,7 @@ get_group_schedule_function (int argc, char *argv[])
group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING; group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
while (pads) { while (pads) {
GstBuffer *buffer; GstData *data;
GstPad *pad = GST_PAD_CAST (pads->data); GstPad *pad = GST_PAD_CAST (pads->data);
pads = g_list_next (pads); pads = g_list_next (pads);
@ -1036,13 +960,13 @@ get_group_schedule_function (int argc, char *argv[])
GST_LOG ("doing get and push on pad \"%s:%s\" in group %p", GST_LOG ("doing get and push on pad \"%s:%s\" in group %p",
GST_DEBUG_PAD_NAME (pad), group); GST_DEBUG_PAD_NAME (pad), group);
buffer = GST_RPAD_GETFUNC (pad) (pad); data = GST_RPAD_GETFUNC (pad) (pad);
if (buffer) { if (data) {
if (GST_EVENT_IS_INTERRUPT (buffer)) { if (GST_EVENT_IS_INTERRUPT (data)) {
gst_event_unref (GST_EVENT (buffer)); gst_event_unref (GST_EVENT (data));
break; break;
} }
gst_pad_push (pad, buffer); gst_pad_push (pad, data);
} }
} }
@ -1095,7 +1019,7 @@ unknown_group_schedule_function (int argc, char *argv[])
* link performs a push to the loop element. We then schedule the * link performs a push to the loop element. We then schedule the
* group with the loop-based element until the bufpen is empty */ * group with the loop-based element until the bufpen is empty */
static void static void
gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstBuffer *buffer) gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data)
{ {
GstOptSchedulerGroup *group; GstOptSchedulerGroup *group;
GstOptScheduler *osched; GstOptScheduler *osched;
@ -1112,36 +1036,33 @@ gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstBuffer *buffer)
group_error_handler (group); group_error_handler (group);
} }
else { else {
GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer); GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), data);
schedule_group (group); schedule_group (group);
} }
#else #else
GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer); GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), data);
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) { if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
/* Only schedule the group if it's not already scheduled */ GST_LOG ("adding %p to runqueue", group);
if (osched->runqueue == NULL || osched->runqueue->data != group) { if (!g_list_find (osched->runqueue, group))
{
ref_group (group); ref_group (group);
GST_LOG ("adding %p to runqueue", group);
osched->runqueue = g_list_append (osched->runqueue, group); osched->runqueue = g_list_append (osched->runqueue, group);
} }
} }
#endif #endif
#if 0
GST_LOG ("after loop wrapper buflist %d", GST_LOG ("after loop wrapper buflist %d",
g_list_length (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)))); g_list_length (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))));
#endif
} }
/* this function is called by a loop based element that performs a /* this function is called by a loop based element that performs a
* pull on a sinkpad. We schedule the peer group until the bufpen * pull on a sinkpad. We schedule the peer group until the bufpen
* is filled with the buffer so that this function can return */ * is filled with the buffer so that this function can return */
static GstBuffer* static GstData*
gst_opt_scheduler_get_wrapper (GstPad *srcpad) gst_opt_scheduler_get_wrapper (GstPad *srcpad)
{ {
GstBuffer *buffer; GstData *data;
GstPad *sinkpad; GstOptSchedulerGroup *group;
GstOptSchedulerGroup *group, *peergroup;
GstOptScheduler *osched; GstOptScheduler *osched;
gboolean disabled; gboolean disabled;
@ -1149,24 +1070,19 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
/* first try to grab a queued buffer */ /* first try to grab a queued buffer */
if (GST_PAD_BUFLIST (srcpad)) { if (GST_PAD_BUFLIST (srcpad)) {
buffer = GST_PAD_BUFLIST (srcpad)->data; data = GST_PAD_BUFLIST (srcpad)->data;
GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer); GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
GST_LOG ("get wrapper, returning queued buffer"); GST_LOG ("get wrapper, returning queued data %d",
g_list_length (GST_PAD_BUFLIST (srcpad)));
return buffer; return data;
} }
sinkpad = gst_pad_get_peer (srcpad);
peergroup = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
/* Disable the loop group */
peergroup->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
peergroup->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED;
/* else we need to schedule the peer element */ /* else we need to schedule the peer element */
group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad)); group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
osched = group->chain->sched; osched = group->chain->sched;
buffer = NULL; data = NULL;
disabled = FALSE; disabled = FALSE;
do { do {
@ -1174,10 +1090,14 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
schedule_group (group); schedule_group (group);
#else #else
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) { if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
ref_group_by_count (group, 2); ref_group (group);
if (!g_list_find (osched->runqueue, group))
{
ref_group (group);
osched->runqueue = g_list_append (osched->runqueue, group);
}
gst_opt_scheduler_debug (osched, "scheduler debug");
osched->runqueue = g_list_append (osched->runqueue, group);
GST_LOG_OBJECT (osched, "recursing into scheduler group %p", group); GST_LOG_OBJECT (osched, "recursing into scheduler group %p", group);
gst_opt_scheduler_schedule_run_queue (osched); gst_opt_scheduler_schedule_run_queue (osched);
GST_LOG_OBJECT (osched, "return from recurse group %p", group); GST_LOG_OBJECT (osched, "return from recurse group %p", group);
@ -1192,52 +1112,48 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
} }
} }
else { else {
/* in this case, the group was running and we wanted to swich to it, /* in this case, the group was running and we wanted to swtich to it,
* this is not allowed in the optimal scheduler (yet) */ * this is not allowed in the optimal scheduler (yet) */
g_warning ("deadlock detected, disabling group %p", group); g_warning ("deadlock detected, disabling group %p", group);
group_error_handler (group); group_error_handler (group);
return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)); return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
} }
#endif #endif
/* Reenable our loop group */
peergroup->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED;
peergroup->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
/* if the scheduler interrupted, make sure we send an INTERRUPTED event to the /* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
* loop based element */ * loop based element */
if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) { if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
GST_INFO ( "scheduler interrupted, return interrupt event"); GST_INFO ( "scheduler interrupted, return interrupt event");
buffer = GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)); data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
} }
else { else {
if (GST_PAD_BUFLIST (srcpad)) { if (GST_PAD_BUFLIST (srcpad)) {
buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data; data = GST_PAD_BUFLIST (srcpad)->data;
GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer); GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
} }
else if (disabled) { else if (disabled) {
/* no buffer in queue and peer group was disabled */ /* no buffer in queue and peer group was disabled */
buffer = GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)); data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
} }
} }
} }
while (buffer == NULL); while (data == NULL);
GST_LOG ("get wrapper, returning buffer %p", buffer); GST_LOG ("get wrapper, returning data %p, queue length %d",
data, g_list_length (GST_PAD_BUFLIST (srcpad)));
return buffer; return data;
} }
/* this function is a chain wrapper for non-event-aware plugins, /* this function is a chain wrapper for non-event-aware plugins,
* it'll simply dispatch the events to the (default) event handler */ * it'll simply dispatch the events to the (default) event handler */
static void static void
gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstBuffer *buffer) gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstData *data)
{ {
if (GST_IS_EVENT (buffer)) { if (GST_IS_EVENT (data)) {
gst_pad_send_event (sinkpad, GST_EVENT (buffer)); gst_pad_send_event (sinkpad, GST_EVENT (data));
} }
else { else {
GST_RPAD_CHAINFUNC (sinkpad) (sinkpad, buffer); GST_RPAD_CHAINFUNC (sinkpad) (sinkpad, data);
} }
} }
@ -2148,18 +2064,13 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
/* we have to schedule each of the scheduler chains now */ /* we have to schedule each of the scheduler chains now */
chains = osched->chains; chains = osched->chains;
GST_INFO ("going through all chains (%d)",
g_slist_length (chains));
while (chains) { while (chains) {
GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data; GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
gst_opt_scheduler_chain_debug (chain, "chain loop");
ref_chain (chain); ref_chain (chain);
/* if the chain is not disabled, schedule it */ /* if the chain is not disabled, schedule it */
if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) { if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
GST_INFO ("chain %p enabled, scheduling", chain);
schedule_chain (chain); schedule_chain (chain);
GST_INFO ("chain %p enabled, scheduling done", chain);
scheduled = TRUE; scheduled = TRUE;
} }
@ -2178,7 +2089,6 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
chains = g_slist_next (chains); chains = g_slist_next (chains);
unref_chain (chain); unref_chain (chain);
} }
GST_INFO ("done going through all chains");
/* at this point it's possible that the scheduler state is /* at this point it's possible that the scheduler state is
* in error, we then return an error */ * in error, we then return an error */

View file

@ -92,7 +92,7 @@ static void gst_aggregator_set_property (GObject *object, guint prop_id,
static void gst_aggregator_get_property (GObject *object, guint prop_id, static void gst_aggregator_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_aggregator_chain (GstPad *pad, GstBuffer *buf); static void gst_aggregator_chain (GstPad *pad, GstData *_data);
static void gst_aggregator_loop (GstElement *element); static void gst_aggregator_loop (GstElement *element);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -286,7 +286,7 @@ gst_aggregator_push (GstAggregator *aggregator, GstPad *pad, GstBuffer *buf, guc
g_object_notify (G_OBJECT (aggregator), "last_message"); g_object_notify (G_OBJECT (aggregator), "last_message");
} }
gst_pad_push (aggregator->srcpad, buf); gst_pad_push (aggregator->srcpad, GST_DATA (buf));
} }
static void static void
@ -313,7 +313,7 @@ gst_aggregator_loop (GstElement *element)
* and that the peer pad is also enabled. * and that the peer pad is also enabled.
*/ */
if (GST_PAD_IS_USABLE (pad)) { if (GST_PAD_IS_USABLE (pad)) {
buf = gst_pad_pull (pad); buf = GST_BUFFER (gst_pad_pull (pad));
debug = "loop"; debug = "loop";
/* then push it forward */ /* then push it forward */
@ -328,7 +328,7 @@ gst_aggregator_loop (GstElement *element)
debug = "loop_select"; debug = "loop_select";
pad = gst_pad_select (aggregator->sinkpads); pad = gst_pad_select (aggregator->sinkpads);
buf = gst_pad_pull (pad); buf = GST_BUFFER (gst_pad_pull (pad));
gst_aggregator_push (aggregator, pad, buf, debug); gst_aggregator_push (aggregator, pad, buf, debug);
} }
@ -346,8 +346,9 @@ gst_aggregator_loop (GstElement *element)
* Chain a buffer on a pad. * Chain a buffer on a pad.
*/ */
static void static void
gst_aggregator_chain (GstPad *pad, GstBuffer *buf) gst_aggregator_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstAggregator *aggregator; GstAggregator *aggregator;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -102,7 +102,7 @@ static void gst_fakesink_get_property (GObject *object, guint prop_id,
static GstElementStateReturn static GstElementStateReturn
gst_fakesink_change_state (GstElement *element); gst_fakesink_change_state (GstElement *element);
static void gst_fakesink_chain (GstPad *pad, GstBuffer *buf); static void gst_fakesink_chain (GstPad *pad, GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
static guint gst_fakesink_signals[LAST_SIGNAL] = { 0 }; static guint gst_fakesink_signals[LAST_SIGNAL] = { 0 };
@ -298,8 +298,9 @@ gst_fakesink_get_property (GObject *object, guint prop_id, GValue *value, GParam
} }
static void static void
gst_fakesink_chain (GstPad *pad, GstBuffer *buf) gst_fakesink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstFakeSink *fakesink; GstFakeSink *fakesink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -170,7 +170,7 @@ static void gst_fakesrc_get_property (GObject *object, guint prop_id,
static GstElementStateReturn gst_fakesrc_change_state (GstElement *element); static GstElementStateReturn gst_fakesrc_change_state (GstElement *element);
static GstBuffer* gst_fakesrc_get (GstPad *pad); static GstData* gst_fakesrc_get (GstPad *pad);
static void gst_fakesrc_loop (GstElement *element); static void gst_fakesrc_loop (GstElement *element);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -734,7 +734,7 @@ gst_fakesrc_create_buffer (GstFakeSrc *src)
return buf; return buf;
} }
static GstBuffer * static GstData *
gst_fakesrc_get(GstPad *pad) gst_fakesrc_get(GstPad *pad)
{ {
GstFakeSrc *src; GstFakeSrc *src;
@ -748,22 +748,22 @@ gst_fakesrc_get(GstPad *pad)
if (src->need_flush) { if (src->need_flush) {
src->need_flush = FALSE; src->need_flush = FALSE;
return GST_BUFFER(gst_event_new (GST_EVENT_FLUSH)); return GST_DATA(gst_event_new (GST_EVENT_FLUSH));
} }
if (src->buffer_count == src->segment_end) { if (src->buffer_count == src->segment_end) {
if (src->segment_loop) { if (src->segment_loop) {
return GST_BUFFER(gst_event_new (GST_EVENT_SEGMENT_DONE)); return GST_DATA(gst_event_new (GST_EVENT_SEGMENT_DONE));
} }
else { else {
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA(gst_event_new (GST_EVENT_EOS));
} }
} }
if (src->rt_num_buffers == 0) { if (src->rt_num_buffers == 0) {
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA(gst_event_new (GST_EVENT_EOS));
} }
else { else {
if (src->rt_num_buffers > 0) if (src->rt_num_buffers > 0)
@ -772,7 +772,7 @@ gst_fakesrc_get(GstPad *pad)
if (src->eos) { if (src->eos) {
GST_INFO ( "fakesrc is setting eos on pad"); GST_INFO ( "fakesrc is setting eos on pad");
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA(gst_event_new (GST_EVENT_EOS));
} }
buf = gst_fakesrc_create_buffer (src); buf = gst_fakesrc_create_buffer (src);
@ -795,7 +795,7 @@ gst_fakesrc_get(GstPad *pad)
GST_LOG_OBJECT (src, "post handoff emit"); GST_LOG_OBJECT (src, "post handoff emit");
} }
return buf; return GST_DATA (buf);
} }
/** /**
@ -819,10 +819,10 @@ gst_fakesrc_loop(GstElement *element)
while (pads) { while (pads) {
GstPad *pad = GST_PAD (pads->data); GstPad *pad = GST_PAD (pads->data);
GstBuffer *buf; GstData *data;
buf = gst_fakesrc_get (pad); data = gst_fakesrc_get (pad);
gst_pad_push (pad, buf); gst_pad_push (pad, data);
if (src->eos) { if (src->eos) {
return; return;

View file

@ -61,7 +61,7 @@ static void gst_fdsink_set_property (GObject *object, guint prop_id,
static void gst_fdsink_get_property (GObject *object, guint prop_id, static void gst_fdsink_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_fdsink_chain (GstPad *pad,GstBuffer *buf); static void gst_fdsink_chain (GstPad *pad,GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
/*static guint gst_fdsink_signals[LAST_SIGNAL] = { 0 };*/ /*static guint gst_fdsink_signals[LAST_SIGNAL] = { 0 };*/
@ -115,8 +115,9 @@ gst_fdsink_init (GstFdSink *fdsink)
} }
static void static void
gst_fdsink_chain (GstPad *pad, GstBuffer *buf) gst_fdsink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstFdSink *fdsink; GstFdSink *fdsink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -71,7 +71,7 @@ static void gst_fdsrc_set_property (GObject *object, guint prop_id,
static void gst_fdsrc_get_property (GObject *object, guint prop_id, static void gst_fdsrc_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static GstBuffer * gst_fdsrc_get (GstPad *pad); static GstData * gst_fdsrc_get (GstPad *pad);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -177,7 +177,7 @@ gst_fdsrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
} }
} }
static GstBuffer * static GstData *
gst_fdsrc_get(GstPad *pad) gst_fdsrc_get(GstPad *pad)
{ {
GstFdSrc *src; GstFdSrc *src;
@ -195,14 +195,14 @@ gst_fdsrc_get(GstPad *pad)
/* if nothing was read, we're in eos */ /* if nothing was read, we're in eos */
if (readbytes == 0) { if (readbytes == 0) {
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER (gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
if (readbytes == -1) { if (readbytes == -1) {
g_error ("Error reading from file descriptor. Ending stream.\n"); g_error ("Error reading from file descriptor. Ending stream.\n");
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER (gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_SIZE (buf) = readbytes; GST_BUFFER_SIZE (buf) = readbytes;
@ -210,5 +210,5 @@ gst_fdsrc_get(GstPad *pad)
src->curoffset += readbytes; src->curoffset += readbytes;
/* we're done, return the buffer */ /* we're done, return the buffer */
return buf; return GST_DATA (buf);
} }

View file

@ -83,7 +83,7 @@ static void gst_filesink_close_file (GstFileSink *sink);
static gboolean gst_filesink_handle_event (GstPad *pad, GstEvent *event); static gboolean gst_filesink_handle_event (GstPad *pad, GstEvent *event);
static gboolean gst_filesink_pad_query (GstPad *pad, GstQueryType type, static gboolean gst_filesink_pad_query (GstPad *pad, GstQueryType type,
GstFormat *format, gint64 *value); GstFormat *format, gint64 *value);
static void gst_filesink_chain (GstPad *pad,GstBuffer *buf); static void gst_filesink_chain (GstPad *pad,GstData *_data);
static GstElementStateReturn gst_filesink_change_state (GstElement *element); static GstElementStateReturn gst_filesink_change_state (GstElement *element);
@ -361,8 +361,9 @@ gst_filesink_handle_event (GstPad *pad, GstEvent *event)
* take the buffer from the pad and write to file if it's open * take the buffer from the pad and write to file if it's open
*/ */
static void static void
gst_filesink_chain (GstPad *pad, GstBuffer *buf) gst_filesink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstFileSink *filesink; GstFileSink *filesink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -142,7 +142,7 @@ static void gst_filesrc_set_property (GObject *object, guint prop_id,
static void gst_filesrc_get_property (GObject *object, guint prop_id, static void gst_filesrc_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static GstBuffer * gst_filesrc_get (GstPad *pad); static GstData * gst_filesrc_get (GstPad *pad);
static gboolean gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event); static gboolean gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event);
static gboolean gst_filesrc_srcpad_query (GstPad *pad, GstQueryType type, static gboolean gst_filesrc_srcpad_query (GstPad *pad, GstQueryType type,
GstFormat *format, gint64 *value); GstFormat *format, gint64 *value);
@ -650,7 +650,7 @@ gst_filesrc_get_read (GstFileSrc *src)
return buf; return buf;
} }
static GstBuffer * static GstData *
gst_filesrc_get (GstPad *pad) gst_filesrc_get (GstPad *pad)
{ {
GstFileSrc *src; GstFileSrc *src;
@ -667,13 +667,13 @@ gst_filesrc_get (GstPad *pad)
GST_DEBUG ("filesrc sending discont"); GST_DEBUG ("filesrc sending discont");
event = gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset, NULL); event = gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset, NULL);
src->need_flush = FALSE; src->need_flush = FALSE;
return GST_BUFFER (event); return GST_DATA (event);
} }
/* check for flush */ /* check for flush */
if (src->need_flush) { if (src->need_flush) {
src->need_flush = FALSE; src->need_flush = FALSE;
GST_DEBUG ("filesrc sending flush"); GST_DEBUG ("filesrc sending flush");
return GST_BUFFER (gst_event_new_flush ()); return GST_DATA (gst_event_new_flush ());
} }
/* check for EOF */ /* check for EOF */
@ -681,13 +681,13 @@ gst_filesrc_get (GstPad *pad)
GST_DEBUG ("filesrc eos %" G_GINT64_FORMAT" %" G_GINT64_FORMAT, GST_DEBUG ("filesrc eos %" G_GINT64_FORMAT" %" G_GINT64_FORMAT,
src->curoffset, src->filelen); src->curoffset, src->filelen);
gst_element_set_eos (GST_ELEMENT (src)); gst_element_set_eos (GST_ELEMENT (src));
return GST_BUFFER (gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
if (src->using_mmap){ if (src->using_mmap){
return gst_filesrc_get_mmap (src); return GST_DATA (gst_filesrc_get_mmap (src));
}else{ }else{
return gst_filesrc_get_read (src); return GST_DATA (gst_filesrc_get_read (src));
} }
} }

View file

@ -70,7 +70,7 @@ static void gst_identity_init (GstIdentity *identity);
static void gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_identity_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_identity_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_identity_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static void gst_identity_chain (GstPad *pad, GstBuffer *buf); static void gst_identity_chain (GstPad *pad, GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
static guint gst_identity_signals[LAST_SIGNAL] = { 0 }; static guint gst_identity_signals[LAST_SIGNAL] = { 0 };
@ -224,8 +224,9 @@ gst_identity_init (GstIdentity *identity)
} }
static void static void
gst_identity_chain (GstPad *pad, GstBuffer *buf) gst_identity_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstIdentity *identity; GstIdentity *identity;
guint i; guint i;
@ -286,7 +287,7 @@ gst_identity_chain (GstPad *pad, GstBuffer *buf)
if (i>1) if (i>1)
gst_buffer_ref (buf); gst_buffer_ref (buf);
gst_pad_push (identity->srcpad, buf); gst_pad_push (identity->srcpad, GST_DATA (buf));
if (identity->sleep_time) if (identity->sleep_time)
g_usleep (identity->sleep_time); g_usleep (identity->sleep_time);
@ -304,7 +305,7 @@ gst_identity_loop (GstElement *element)
identity = GST_IDENTITY (element); identity = GST_IDENTITY (element);
buf = gst_pad_pull (identity->sinkpad); buf = GST_BUFFER (gst_pad_pull (identity->sinkpad));
if (GST_IS_EVENT (buf)) { if (GST_IS_EVENT (buf)) {
GstEvent *event = GST_EVENT (buf); GstEvent *event = GST_EVENT (buf);
@ -316,7 +317,7 @@ gst_identity_loop (GstElement *element)
} }
} }
else { else {
gst_identity_chain (identity->sinkpad, buf); gst_identity_chain (identity->sinkpad, GST_DATA (buf));
} }
} }

View file

@ -63,7 +63,7 @@ static void gst_md5sink_init (GstMD5Sink *md5sink);
static void gst_md5sink_get_property (GObject *object, guint prop_id, static void gst_md5sink_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_md5sink_chain (GstPad *pad, GstBuffer *buf); static void gst_md5sink_chain (GstPad *pad, GstData *_data);
static GstElementStateReturn gst_md5sink_change_state (GstElement *element); static GstElementStateReturn gst_md5sink_change_state (GstElement *element);
/* variables */ /* variables */
@ -489,8 +489,9 @@ gst_md5sink_get_property (GObject *object, guint prop_id, GValue *value, GParamS
} }
static void static void
gst_md5sink_chain (GstPad *pad, GstBuffer *buf) gst_md5sink_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstMD5Sink *md5sink; GstMD5Sink *md5sink;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);

View file

@ -63,7 +63,7 @@ static void gst_multidisksrc_init (GstMultiDiskSrc *disksrc);
static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static GstBuffer * gst_multidisksrc_get (GstPad *pad); static GstData * gst_multidisksrc_get (GstPad *pad);
/*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/ /*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/
static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element); static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element);
@ -195,7 +195,7 @@ gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GP
* *
* Push a new buffer from the disksrc at the current offset. * Push a new buffer from the disksrc at the current offset.
*/ */
static GstBuffer * static GstData *
gst_multidisksrc_get (GstPad *pad) gst_multidisksrc_get (GstPad *pad)
{ {
GstMultiDiskSrc *src; GstMultiDiskSrc *src;
@ -209,7 +209,7 @@ gst_multidisksrc_get (GstPad *pad)
gst_multidisksrc_close_file(src); gst_multidisksrc_close_file(src);
if (!src->listptr) { if (!src->listptr) {
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
list = src->listptr; list = src->listptr;
@ -240,7 +240,7 @@ gst_multidisksrc_get (GstPad *pad)
} }
/* we're done, return the buffer */ /* we're done, return the buffer */
return buf; return GST_DATA (buf);
} }
/* open the file and mmap it, necessary to go to READY state */ /* open the file and mmap it, necessary to go to READY state */

View file

@ -63,7 +63,7 @@ static void gst_multidisksrc_init (GstMultiDiskSrc *disksrc);
static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_multidisksrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static GstBuffer * gst_multidisksrc_get (GstPad *pad); static GstData * gst_multidisksrc_get (GstPad *pad);
/*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/ /*static GstBuffer * gst_multidisksrc_get_region (GstPad *pad,GstRegionType type,guint64 offset,guint64 len);*/
static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element); static GstElementStateReturn gst_multidisksrc_change_state (GstElement *element);
@ -195,7 +195,7 @@ gst_multidisksrc_get_property (GObject *object, guint prop_id, GValue *value, GP
* *
* Push a new buffer from the disksrc at the current offset. * Push a new buffer from the disksrc at the current offset.
*/ */
static GstBuffer * static GstData *
gst_multidisksrc_get (GstPad *pad) gst_multidisksrc_get (GstPad *pad)
{ {
GstMultiDiskSrc *src; GstMultiDiskSrc *src;
@ -209,7 +209,7 @@ gst_multidisksrc_get (GstPad *pad)
gst_multidisksrc_close_file(src); gst_multidisksrc_close_file(src);
if (!src->listptr) { if (!src->listptr) {
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
list = src->listptr; list = src->listptr;
@ -240,7 +240,7 @@ gst_multidisksrc_get (GstPad *pad)
} }
/* we're done, return the buffer */ /* we're done, return the buffer */
return buf; return GST_DATA (buf);
} }
/* open the file and mmap it, necessary to go to READY state */ /* open the file and mmap it, necessary to go to READY state */

View file

@ -68,8 +68,8 @@ static void gst_pipefilter_init (GstPipefilter *pipefilter);
static void gst_pipefilter_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_pipefilter_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_pipefilter_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_pipefilter_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static GstBuffer* gst_pipefilter_get (GstPad *pad); static GstData* gst_pipefilter_get (GstPad *pad);
static void gst_pipefilter_chain (GstPad *pad, GstBuffer *buf); static void gst_pipefilter_chain (GstPad *pad, GstData *_data);
static gboolean gst_pipefilter_handle_event (GstPad *pad, GstEvent *event); static gboolean gst_pipefilter_handle_event (GstPad *pad, GstEvent *event);
static GstElementStateReturn gst_pipefilter_change_state (GstElement *element); static GstElementStateReturn gst_pipefilter_change_state (GstElement *element);
@ -155,7 +155,7 @@ gst_pipefilter_handle_event (GstPad *pad, GstEvent *event)
return TRUE; return TRUE;
} }
static GstBuffer* static GstData*
gst_pipefilter_get (GstPad *pad) gst_pipefilter_get (GstPad *pad)
{ {
GstPipefilter *pipefilter; GstPipefilter *pipefilter;
@ -184,7 +184,7 @@ gst_pipefilter_get (GstPad *pad)
} }
/* if we didn't get as many bytes as we asked for, we're at EOF */ /* if we didn't get as many bytes as we asked for, we're at EOF */
if (readbytes == 0) { if (readbytes == 0) {
return GST_BUFFER(gst_event_new (GST_EVENT_EOS)); return GST_DATA (gst_event_new (GST_EVENT_EOS));
} }
@ -192,12 +192,13 @@ gst_pipefilter_get (GstPad *pad)
GST_BUFFER_SIZE(newbuf) = readbytes; GST_BUFFER_SIZE(newbuf) = readbytes;
pipefilter->curoffset += readbytes; pipefilter->curoffset += readbytes;
return newbuf; return GST_DATA (newbuf);
} }
static void static void
gst_pipefilter_chain (GstPad *pad,GstBuffer *buf) gst_pipefilter_chain (GstPad *pad,GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstPipefilter *pipefilter; GstPipefilter *pipefilter;
glong writebytes; glong writebytes;
guchar *data; guchar *data;

View file

@ -72,8 +72,8 @@ static void gst_queue_set_property (GObject *object, guint prop_id,
static void gst_queue_get_property (GObject *object, guint prop_id, static void gst_queue_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static void gst_queue_chain (GstPad *pad, GstData *data);
static GstBuffer * gst_queue_get (GstPad *pad); static GstData * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event); static gboolean gst_queue_handle_src_event (GstPad *pad, GstEvent *event);
@ -300,13 +300,13 @@ gst_queue_locked_flush (GstQueue *queue)
} }
static void static void
gst_queue_chain (GstPad *pad, GstBuffer *buf) gst_queue_chain (GstPad *pad, GstData *data)
{ {
GstQueue *queue; GstQueue *queue;
g_return_if_fail (pad != NULL); g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad)); g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL); g_return_if_fail (data != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
@ -319,33 +319,35 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "event sent\n");
} }
g_async_queue_unlock(queue->events); g_async_queue_unlock(queue->events);
restart: restart:
/* we have to lock the queue since we span threads */ /* we have to lock the queue since we span threads */
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
g_mutex_lock (queue->qlock); g_mutex_lock (queue->qlock);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
/* assume don't need to flush this buffer when the queue is filled */ /* assume don't need to flush this buffer when the queue is filled */
queue->flush = FALSE; queue->flush = FALSE;
if (GST_IS_EVENT (buf)) { if (GST_IS_EVENT (data)) {
switch (GST_EVENT_TYPE (buf)) { switch (GST_EVENT_TYPE (data)) {
case GST_EVENT_FLUSH: case GST_EVENT_FLUSH:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
gst_queue_locked_flush (queue); gst_queue_locked_flush (queue);
break; break;
case GST_EVENT_EOS: case GST_EVENT_EOS:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
GST_ELEMENT_NAME (queue), queue->level_buffers); GST_ELEMENT_NAME (queue), queue->level_buffers);
break; break;
default: default:
/* we put the event in the queue, we don't have to act ourselves */ /* we put the event in the queue, we don't have to act ourselves */
break; break;
} }
} }
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d",buf,GST_BUFFER_SIZE(buf)); if (GST_IS_BUFFER (data))
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue,
"adding buffer %p of size %d", data, GST_BUFFER_SIZE (data));
if (queue->level_buffers == queue->size_buffers) { if (queue->level_buffers == queue->size_buffers) {
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
@ -358,10 +360,10 @@ restart:
/* if we leak on the upstream side, drop the current buffer */ /* if we leak on the upstream side, drop the current buffer */
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
if (GST_IS_EVENT (buf)) if (GST_IS_EVENT (data))
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)), GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(buf))); GST_EVENT_TYPE(GST_EVENT(data)));
/* now we have to clean up and exit right away */ /* now we have to clean up and exit right away */
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
goto out_unref; goto out_unref;
@ -369,21 +371,23 @@ restart:
/* otherwise we have to push a buffer off the other end */ /* otherwise we have to push a buffer off the other end */
else { else {
gpointer front; gpointer front;
GstBuffer *leakbuf; GstData *leak;
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
front = g_queue_pop_head (queue->queue); front = g_queue_pop_head (queue->queue);
leakbuf = (GstBuffer *)(front); leak = GST_DATA (front);
if (GST_IS_EVENT (leakbuf)) { queue->level_buffers--;
if (GST_IS_EVENT (leak)) {
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)), GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(leakbuf))); GST_EVENT_TYPE(GST_EVENT(leak)));
} } else {
queue->level_buffers--; queue->level_bytes -= GST_BUFFER_SIZE(leak);
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); }
gst_data_unref (GST_DATA (leakbuf));
gst_data_unref (leak);
} }
} }
@ -412,7 +416,7 @@ restart:
/* try to signal to resolve the error */ /* try to signal to resolve the error */
if (!queue->may_deadlock) { if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
gst_data_unref (GST_DATA (buf)); gst_data_unref (data);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
/* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */ /* we don't want to goto out_unref here, since we want to clean up before calling gst_element_error */
return; return;
@ -428,14 +432,15 @@ restart:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "got not_full signal");
} }
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes", GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d buffers, %d bytes",
queue->level_buffers, queue->size_buffers, queue->level_bytes); queue->level_buffers, queue->size_buffers, queue->level_bytes);
} }
/* put the buffer on the tail of the list */ /* put the buffer on the tail of the list */
g_queue_push_tail (queue->queue, buf); g_queue_push_tail (queue->queue, data);
queue->level_buffers++; queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf); if (GST_IS_BUFFER (data))
queue->level_bytes += GST_BUFFER_SIZE (data);
/* this assertion _has_ to hold */ /* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers); g_assert (queue->queue->length == queue->level_buffers);
@ -451,15 +456,15 @@ restart:
return; return;
out_unref: out_unref:
gst_data_unref (GST_DATA (buf)); gst_data_unref (data);
return; return;
} }
static GstBuffer * static GstData *
gst_queue_get (GstPad *pad) gst_queue_get (GstPad *pad)
{ {
GstQueue *queue; GstQueue *queue;
GstBuffer *buf = NULL; GstData *data = NULL;
gpointer front; gpointer front;
g_assert(pad != NULL); g_assert(pad != NULL);
@ -486,7 +491,7 @@ restart:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!"); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "interrupted!!");
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue))) if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT)); return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
goto restart; goto restart;
} }
if (GST_STATE (queue) != GST_STATE_PLAYING) { if (GST_STATE (queue) != GST_STATE_PLAYING) {
@ -512,7 +517,7 @@ restart:
if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){ if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
g_warning ("filler"); g_warning ("filler");
return GST_BUFFER(gst_event_new_filler()); return GST_DATA (gst_event_new_filler());
} }
} }
else { else {
@ -524,11 +529,12 @@ restart:
queue->level_buffers, queue->size_buffers, queue->level_bytes); queue->level_buffers, queue->size_buffers, queue->level_bytes);
front = g_queue_pop_head (queue->queue); front = g_queue_pop_head (queue->queue);
buf = (GstBuffer *)(front); data = GST_DATA (front);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue", buf); GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "retrieved data %p from queue", data);
queue->level_buffers--; queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf); if (GST_IS_BUFFER (data))
queue->level_bytes -= GST_BUFFER_SIZE (data);
GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes", GST_CAT_LOG_OBJECT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d buffers, %d bytes",
GST_DEBUG_PAD_NAME(pad), GST_DEBUG_PAD_NAME(pad),
@ -543,8 +549,8 @@ restart:
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
/* FIXME where should this be? locked? */ /* FIXME where should this be? locked? */
if (GST_IS_EVENT(buf)) { if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT(buf); GstEvent *event = GST_EVENT (data);
switch (GST_EVENT_TYPE(event)) { switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS: case GST_EVENT_EOS:
GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue)); GST_CAT_DEBUG_OBJECT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
@ -555,7 +561,7 @@ restart:
} }
} }
return buf; return data;
} }

View file

@ -286,14 +286,14 @@ gst_shaper_loop (GstElement *element)
if (connection->buffer == NULL && GST_PAD_IS_USABLE (connection->sinkpad)) { if (connection->buffer == NULL && GST_PAD_IS_USABLE (connection->sinkpad)) {
GstBuffer *buffer; GstBuffer *buffer;
buffer = gst_pad_pull (connection->sinkpad); buffer = GST_BUFFER (gst_pad_pull (connection->sinkpad));
/* events are simply pushed ASAP */ /* events are simply pushed ASAP */
if (GST_IS_EVENT (buffer)) { if (GST_IS_EVENT (buffer)) {
/* save event type as it will be unreffed after the next push */ /* save event type as it will be unreffed after the next push */
GstEventType type = GST_EVENT_TYPE (buffer); GstEventType type = GST_EVENT_TYPE (buffer);
gst_pad_push (connection->srcpad, buffer); gst_pad_push (connection->srcpad, GST_DATA (buffer));
switch (type) { switch (type) {
/* on EOS we disable the pad so that we don't pull on /* on EOS we disable the pad so that we don't pull on
@ -322,7 +322,7 @@ gst_shaper_loop (GstElement *element)
} }
/* if we have a connection with a buffer, push it */ /* if we have a connection with a buffer, push it */
if (min != NULL && min->buffer) { if (min != NULL && min->buffer) {
gst_pad_push (min->srcpad, min->buffer); gst_pad_push (min->srcpad, GST_DATA (min->buffer));
min->buffer = NULL; min->buffer = NULL;
/* since we pushed a buffer, it's not EOS */ /* since we pushed a buffer, it's not EOS */
eos = FALSE; eos = FALSE;

View file

@ -67,7 +67,7 @@ static void gst_statistics_init (GstStatistics *statistics);
static void gst_statistics_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); static void gst_statistics_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
static void gst_statistics_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_statistics_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec);
static void gst_statistics_chain (GstPad *pad, GstBuffer *buf); static void gst_statistics_chain (GstPad *pad, GstData *_data);
static void gst_statistics_reset (GstStatistics *statistics); static void gst_statistics_reset (GstStatistics *statistics);
static void gst_statistics_print (GstStatistics *statistics); static void gst_statistics_print (GstStatistics *statistics);
@ -256,8 +256,9 @@ gst_statistics_print (GstStatistics *statistics)
} }
static void static void
gst_statistics_chain (GstPad *pad, GstBuffer *buf) gst_statistics_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstStatistics *statistics; GstStatistics *statistics;
gboolean update = FALSE; gboolean update = FALSE;
@ -313,7 +314,7 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
gst_statistics_print(statistics); gst_statistics_print(statistics);
} }
} }
gst_pad_push (statistics->srcpad, buf); gst_pad_push (statistics->srcpad, GST_DATA (buf));
} }
static void static void

View file

@ -71,7 +71,7 @@ static void gst_tee_set_property (GObject *object, guint prop_id,
static void gst_tee_get_property (GObject *object, guint prop_id, static void gst_tee_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec); GValue *value, GParamSpec *pspec);
static void gst_tee_chain (GstPad *pad, GstBuffer *buf); static void gst_tee_chain (GstPad *pad, GstData *_data);
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -342,8 +342,9 @@ gst_tee_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec
* Chain a buffer on a pad. * Chain a buffer on a pad.
*/ */
static void static void
gst_tee_chain (GstPad *pad, GstBuffer *buf) gst_tee_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstTee *tee; GstTee *tee;
const GList *pads; const GList *pads;
@ -373,7 +374,7 @@ gst_tee_chain (GstPad *pad, GstBuffer *buf)
} }
if (GST_PAD_IS_USABLE (outpad)) if (GST_PAD_IS_USABLE (outpad))
gst_pad_push (outpad, buf); gst_pad_push (outpad, GST_DATA (buf));
else else
gst_buffer_unref (buf); gst_buffer_unref (buf);
} }

View file

@ -92,7 +92,7 @@ GST_PAD_TEMPLATE_FACTORY (src_factory,
static void gst_example_class_init (GstExampleClass *klass); static void gst_example_class_init (GstExampleClass *klass);
static void gst_example_init (GstExample *example); static void gst_example_init (GstExample *example);
static void gst_example_chain (GstPad *pad, GstBuffer *buf); static void gst_example_chain (GstPad *pad, GstData *_data);
static void gst_example_set_property (GObject *object, guint prop_id, static void gst_example_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec); const GValue *value, GParamSpec *pspec);
@ -227,8 +227,9 @@ gst_example_init(GstExample *example)
* as the buffer provided by the peer element. * as the buffer provided by the peer element.
*/ */
static void static void
gst_example_chain (GstPad *pad, GstBuffer *buf) gst_example_chain (GstPad *pad, GstData *_data)
{ {
GstBuffer *buf = GST_BUFFER (_data);
GstExample *example; GstExample *example;
GstBuffer *outbuf; GstBuffer *outbuf;
@ -272,7 +273,7 @@ gst_example_chain (GstPad *pad, GstBuffer *buf)
* in the pipeline, through the element's source pad, which is stored * in the pipeline, through the element's source pad, which is stored
* in the element's structure. * in the element's structure.
*/ */
gst_pad_push(example->srcpad,outbuf); gst_pad_push(example->srcpad,GST_DATA (outbuf));
/* For fun we'll emit our useless signal here */ /* For fun we'll emit our useless signal here */
g_signal_emit(G_OBJECT (example), gst_example_signals[ASDF], 0, g_signal_emit(G_OBJECT (example), gst_example_signals[ASDF], 0,
@ -280,7 +281,7 @@ gst_example_chain (GstPad *pad, GstBuffer *buf)
/* If we're not doing something, just send the original incoming buffer. */ /* If we're not doing something, just send the original incoming buffer. */
} else { } else {
gst_pad_push(example->srcpad,buf); gst_pad_push(example->srcpad,GST_DATA (buf));
} }
} }