From 009d01a841c571b1333887fd4845117a8eaf96da Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Wed, 9 Mar 2005 11:45:03 +0000 Subject: [PATCH] Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default) to indicate whether a reply is expected to an upstream... Original commit message from CVS: * docs/gst/gstreamer-sections.txt: * docs/gst/tmpl/gstevent.sgml: * gst/gstevent.c: (gst_event_new): * gst/gstevent.h: Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default) to indicate whether a reply is expected to an upstream event. * gst/gstqueue.c: (gst_queue_finalize), (gst_queue_handle_pending_events), (gst_queue_handle_src_event): If GST_EVENT_COMMON_FLAG_NEED_RESPONSE is not set, just queue the event without blocking on a reply. --- ChangeLog | 28 +++++++++---------- docs/gst/gstreamer-sections.txt | 3 ++ docs/gst/tmpl/gstevent.sgml | 8 ++++++ gst/gstevent.c | 1 + gst/gstevent.h | 12 ++++++++ gst/gstqueue.c | 49 +++++++++++++++++++++++++-------- plugins/elements/gstqueue.c | 49 +++++++++++++++++++++++++-------- 7 files changed, 114 insertions(+), 36 deletions(-) diff --git a/ChangeLog b/ChangeLog index baa30417b0..27b4765d12 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,17 @@ +2005-03-09 Jan Schmidt + + * docs/gst/gstreamer-sections.txt: + * docs/gst/tmpl/gstevent.sgml: + * gst/gstevent.c: (gst_event_new): + * gst/gstevent.h: + Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default) + to indicate whether a reply is expected to an upstream event. + + * gst/gstqueue.c: (gst_queue_finalize), + (gst_queue_handle_pending_events), (gst_queue_handle_src_event): + If GST_EVENT_COMMON_FLAG_NEED_RESPONSE is not set, just queue the + event without blocking on a reply. + 2005-03-09 Ronald S. Bultje * docs/gst/Makefile.am: @@ -61,20 +75,6 @@ Return INTERRUPT event if the peer is unlinked when restarting during a pad_pull. -2005-03-04 Jan Schmidt - - * docs/gst/gstreamer-sections.txt: - * docs/gst/tmpl/gstevent.sgml: - * gst/gstevent.c: (gst_event_new): - * gst/gstevent.h: - Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default) - to indicate whether a reply is expected to an upstream event. - - * gst/gstqueue.c: (gst_queue_finalize), - (gst_queue_handle_pending_events), (gst_queue_handle_src_event): - If GST_EVENT_COMMON_FLAG_NEED_RESPONSE is not set, just queue the - event without blocking on a reply. - 2005-03-03 Ronald S. Bultje * gst/gstqueue.c: (gst_queue_chain): diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index 9931515ffa..d0781099c9 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -573,6 +573,7 @@ GST_SEEK_FORMAT_MASK GST_SEEK_METHOD_MASK GST_SEEK_FLAGS_MASK GstEventFlag +GstEventCommonFlag GST_EVENT_MASK_FUNCTION GstSeekType GstSeekAccuracy @@ -613,9 +614,11 @@ GST_TYPE_EVENT_FLAG GST_TYPE_EVENT_TYPE GST_TYPE_SEEK_ACCURACY GST_TYPE_SEEK_TYPE +GST_TYPE_EVENT_COMMON_FLAG gst_event_get_type gst_event_flag_get_type +gst_event_common_flag_get_type gst_event_type_get_type gst_seek_accuracy_get_type gst_seek_type_get_type diff --git a/docs/gst/tmpl/gstevent.sgml b/docs/gst/tmpl/gstevent.sgml index 5a6c1ad7e7..1dcba86d2c 100644 --- a/docs/gst/tmpl/gstevent.sgml +++ b/docs/gst/tmpl/gstevent.sgml @@ -156,6 +156,14 @@ Event flags are used when querying for supported events @GST_EVENT_FLAG_NONE: no value @GST_RATE_FLAG_NEGATIVE: indicates negative rates are supported + + + + + +@GST_EVENT_COMMON_FLAG_NEED_RESPONSE: +@GST_EVENT_COMMON_FLAG_LAST: + A convenience macro to create event mask functions diff --git a/gst/gstevent.c b/gst/gstevent.c index 5a64088b20..eb8349b622 100644 --- a/gst/gstevent.c +++ b/gst/gstevent.c @@ -183,6 +183,7 @@ gst_event_new (GstEventType type) GST_EVENT_TYPE (event) = type; GST_EVENT_TIMESTAMP (event) = G_GINT64_CONSTANT (0); GST_EVENT_SRC (event) = NULL; + GST_DATA_FLAG_SET (GST_DATA (event), GST_EVENT_COMMON_FLAG_NEED_RESPONSE); return event; } diff --git a/gst/gstevent.h b/gst/gstevent.h index d10b7f13a3..823a63ce72 100644 --- a/gst/gstevent.h +++ b/gst/gstevent.h @@ -141,6 +141,18 @@ typedef struct gint64 value; } GstFormatValue; +/* FIXME: 0.9 - having GstEventCommonFlag and GstEventFlag is unneeded duplication, + * but for now I have to because the GstEventFlag enum above is not suitable. + */ +typedef enum { + /* Indicates that a result flag is not required. This is most important + * when the event travels upstream through a queue */ + GST_EVENT_COMMON_FLAG_NEED_RESPONSE = GST_DATA_FLAG_LAST, + + /* padding value for future expansion */ + GST_EVENT_COMMON_FLAG_LAST = GST_DATA_FLAG_LAST + 16 +} GstEventCommonFlag; + #define GST_EVENT_SEEK_TYPE(event) (GST_EVENT(event)->event_data.seek.type) #define GST_EVENT_SEEK_FORMAT(event) (GST_EVENT_SEEK_TYPE(event) & GST_SEEK_FORMAT_MASK) #define GST_EVENT_SEEK_METHOD(event) (GST_EVENT_SEEK_TYPE(event) & GST_SEEK_METHOD_MASK) diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 3aeaaaeb11..c449edf03e 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -379,6 +379,7 @@ gst_queue_finalize (GObject * object) GstQueueEventResponse *er = g_queue_pop_head (queue->events); gst_event_unref (er->event); + g_free (er); } g_mutex_unlock (queue->event_lock); g_mutex_free (queue->event_lock); @@ -508,6 +509,7 @@ gst_queue_handle_pending_events (GstQueue * queue) g_mutex_lock (queue->event_lock); while (!g_queue_is_empty (queue->events)) { GstQueueEventResponse *er; + gboolean need_response; er = g_queue_pop_head (queue->events); @@ -522,9 +524,17 @@ gst_queue_handle_pending_events (GstQueue * queue) break; } g_mutex_unlock (queue->event_lock); + + need_response = + GST_DATA_FLAG_IS_SET (GST_DATA (er->event), + GST_EVENT_COMMON_FLAG_NEED_RESPONSE); er->ret = gst_pad_event_default (queue->srcpad, er->event); - er->handled = TRUE; - g_cond_signal (queue->event_done); + if (need_response) { + er->handled = TRUE; + g_cond_signal (queue->event_done); + } else { + g_free (er); + } g_mutex_lock (queue->event_lock); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); } @@ -901,28 +911,42 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) { GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); gboolean res; + GstQueueEventResponse *er = NULL; GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", event, GST_EVENT_TYPE (event)); GST_QUEUE_MUTEX_LOCK; if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { - GstQueueEventResponse er; + gboolean need_response = + GST_DATA_FLAG_IS_SET (GST_DATA (event), + GST_EVENT_COMMON_FLAG_NEED_RESPONSE); + + er = g_new (GstQueueEventResponse, 1); /* push the event to the queue and wait for upstream consumption */ - er.event = event; - er.handled = FALSE; + er->event = event; + er->handled = FALSE; + g_mutex_lock (queue->event_lock); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "putting event %p (%d) on internal queue", event, GST_EVENT_TYPE (event)); - g_queue_push_tail (queue->events, &er); + g_queue_push_tail (queue->events, er); g_mutex_unlock (queue->event_lock); + + if (!need_response) { + /* Leave for upstream to delete */ + er = NULL; + res = TRUE; + goto handled; + } + GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Preparing for loop for event handler"); /* see the chain function on why this is here - it prevents a deadlock */ g_cond_signal (queue->item_del); - while (!er.handled) { + while (!er->handled) { GTimeVal timeout; g_get_current_time (&timeout); @@ -930,10 +954,10 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", g_thread_self ()); if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && - !er.handled) { + !er->handled) { GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "timeout in upstream event handling, dropping event %p (%d)", - er.event, GST_EVENT_TYPE (er.event)); + er->event, GST_EVENT_TYPE (er->event)); g_mutex_lock (queue->event_lock); /* since this queue is for src events (ie upstream), this thread is * the only one that is pushing stuff on it, so we're sure that @@ -942,13 +966,13 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) * the list. */ g_queue_pop_tail (queue->events); g_mutex_unlock (queue->event_lock); - gst_event_unref (er.event); + gst_event_unref (er->event); res = FALSE; goto handled; } } GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled"); - res = er.ret; + res = er->ret; } else { res = gst_pad_event_default (pad, event); @@ -970,6 +994,9 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) handled: GST_QUEUE_MUTEX_UNLOCK; + if (er) + g_free (er); + return res; } diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 3aeaaaeb11..c449edf03e 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -379,6 +379,7 @@ gst_queue_finalize (GObject * object) GstQueueEventResponse *er = g_queue_pop_head (queue->events); gst_event_unref (er->event); + g_free (er); } g_mutex_unlock (queue->event_lock); g_mutex_free (queue->event_lock); @@ -508,6 +509,7 @@ gst_queue_handle_pending_events (GstQueue * queue) g_mutex_lock (queue->event_lock); while (!g_queue_is_empty (queue->events)) { GstQueueEventResponse *er; + gboolean need_response; er = g_queue_pop_head (queue->events); @@ -522,9 +524,17 @@ gst_queue_handle_pending_events (GstQueue * queue) break; } g_mutex_unlock (queue->event_lock); + + need_response = + GST_DATA_FLAG_IS_SET (GST_DATA (er->event), + GST_EVENT_COMMON_FLAG_NEED_RESPONSE); er->ret = gst_pad_event_default (queue->srcpad, er->event); - er->handled = TRUE; - g_cond_signal (queue->event_done); + if (need_response) { + er->handled = TRUE; + g_cond_signal (queue->event_done); + } else { + g_free (er); + } g_mutex_lock (queue->event_lock); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); } @@ -901,28 +911,42 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) { GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); gboolean res; + GstQueueEventResponse *er = NULL; GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", event, GST_EVENT_TYPE (event)); GST_QUEUE_MUTEX_LOCK; if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { - GstQueueEventResponse er; + gboolean need_response = + GST_DATA_FLAG_IS_SET (GST_DATA (event), + GST_EVENT_COMMON_FLAG_NEED_RESPONSE); + + er = g_new (GstQueueEventResponse, 1); /* push the event to the queue and wait for upstream consumption */ - er.event = event; - er.handled = FALSE; + er->event = event; + er->handled = FALSE; + g_mutex_lock (queue->event_lock); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "putting event %p (%d) on internal queue", event, GST_EVENT_TYPE (event)); - g_queue_push_tail (queue->events, &er); + g_queue_push_tail (queue->events, er); g_mutex_unlock (queue->event_lock); + + if (!need_response) { + /* Leave for upstream to delete */ + er = NULL; + res = TRUE; + goto handled; + } + GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Preparing for loop for event handler"); /* see the chain function on why this is here - it prevents a deadlock */ g_cond_signal (queue->item_del); - while (!er.handled) { + while (!er->handled) { GTimeVal timeout; g_get_current_time (&timeout); @@ -930,10 +954,10 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", g_thread_self ()); if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && - !er.handled) { + !er->handled) { GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "timeout in upstream event handling, dropping event %p (%d)", - er.event, GST_EVENT_TYPE (er.event)); + er->event, GST_EVENT_TYPE (er->event)); g_mutex_lock (queue->event_lock); /* since this queue is for src events (ie upstream), this thread is * the only one that is pushing stuff on it, so we're sure that @@ -942,13 +966,13 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) * the list. */ g_queue_pop_tail (queue->events); g_mutex_unlock (queue->event_lock); - gst_event_unref (er.event); + gst_event_unref (er->event); res = FALSE; goto handled; } } GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled"); - res = er.ret; + res = er->ret; } else { res = gst_pad_event_default (pad, event); @@ -970,6 +994,9 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event) handled: GST_QUEUE_MUTEX_UNLOCK; + if (er) + g_free (er); + return res; }