Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default) to indicate whether a reply is expected to an upstream...

Original commit message from CVS:
* docs/gst/gstreamer-sections.txt:
* docs/gst/tmpl/gstevent.sgml:
* gst/gstevent.c: (gst_event_new):
* gst/gstevent.h:
Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default)
to indicate whether a reply is expected to an upstream event.

* gst/gstqueue.c: (gst_queue_finalize),
(gst_queue_handle_pending_events), (gst_queue_handle_src_event):
If GST_EVENT_COMMON_FLAG_NEED_RESPONSE is not set, just queue the
event without blocking on a reply.
This commit is contained in:
Jan Schmidt 2005-03-09 11:45:03 +00:00
parent 32c339c396
commit 009d01a841
7 changed files with 114 additions and 36 deletions

View file

@ -1,3 +1,17 @@
2005-03-09 Jan Schmidt <thaytan@mad.scientist.com>
* docs/gst/gstreamer-sections.txt:
* docs/gst/tmpl/gstevent.sgml:
* gst/gstevent.c: (gst_event_new):
* gst/gstevent.h:
Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default)
to indicate whether a reply is expected to an upstream event.
* gst/gstqueue.c: (gst_queue_finalize),
(gst_queue_handle_pending_events), (gst_queue_handle_src_event):
If GST_EVENT_COMMON_FLAG_NEED_RESPONSE is not set, just queue the
event without blocking on a reply.
2005-03-09 Ronald S. Bultje <rbultje@ronald.bitfreak.net> 2005-03-09 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* docs/gst/Makefile.am: * docs/gst/Makefile.am:
@ -61,20 +75,6 @@
Return INTERRUPT event if the peer is unlinked when restarting Return INTERRUPT event if the peer is unlinked when restarting
during a pad_pull. during a pad_pull.
2005-03-04 Jan Schmidt <thaytan@mad.scientist.com>
* docs/gst/gstreamer-sections.txt:
* docs/gst/tmpl/gstevent.sgml:
* gst/gstevent.c: (gst_event_new):
* gst/gstevent.h:
Add GST_EVENT_COMMON_FLAG_NEED_RESPONSE flag (TRUE by default)
to indicate whether a reply is expected to an upstream event.
* gst/gstqueue.c: (gst_queue_finalize),
(gst_queue_handle_pending_events), (gst_queue_handle_src_event):
If GST_EVENT_COMMON_FLAG_NEED_RESPONSE is not set, just queue the
event without blocking on a reply.
2005-03-03 Ronald S. Bultje <rbultje@ronald.bitfreak.net> 2005-03-03 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* gst/gstqueue.c: (gst_queue_chain): * gst/gstqueue.c: (gst_queue_chain):

View file

@ -573,6 +573,7 @@ GST_SEEK_FORMAT_MASK
GST_SEEK_METHOD_MASK GST_SEEK_METHOD_MASK
GST_SEEK_FLAGS_MASK GST_SEEK_FLAGS_MASK
GstEventFlag GstEventFlag
GstEventCommonFlag
GST_EVENT_MASK_FUNCTION GST_EVENT_MASK_FUNCTION
GstSeekType GstSeekType
GstSeekAccuracy GstSeekAccuracy
@ -613,9 +614,11 @@ GST_TYPE_EVENT_FLAG
GST_TYPE_EVENT_TYPE GST_TYPE_EVENT_TYPE
GST_TYPE_SEEK_ACCURACY GST_TYPE_SEEK_ACCURACY
GST_TYPE_SEEK_TYPE GST_TYPE_SEEK_TYPE
GST_TYPE_EVENT_COMMON_FLAG
<SUBSECTION Private> <SUBSECTION Private>
gst_event_get_type gst_event_get_type
gst_event_flag_get_type gst_event_flag_get_type
gst_event_common_flag_get_type
gst_event_type_get_type gst_event_type_get_type
gst_seek_accuracy_get_type gst_seek_accuracy_get_type
gst_seek_type_get_type gst_seek_type_get_type

View file

@ -156,6 +156,14 @@ Event flags are used when querying for supported events
@GST_EVENT_FLAG_NONE: no value @GST_EVENT_FLAG_NONE: no value
@GST_RATE_FLAG_NEGATIVE: indicates negative rates are supported @GST_RATE_FLAG_NEGATIVE: indicates negative rates are supported
<!-- ##### ENUM GstEventCommonFlag ##### -->
<para>
</para>
@GST_EVENT_COMMON_FLAG_NEED_RESPONSE:
@GST_EVENT_COMMON_FLAG_LAST:
<!-- ##### MACRO GST_EVENT_MASK_FUNCTION ##### --> <!-- ##### MACRO GST_EVENT_MASK_FUNCTION ##### -->
<para> <para>
A convenience macro to create event mask functions A convenience macro to create event mask functions

View file

@ -183,6 +183,7 @@ gst_event_new (GstEventType type)
GST_EVENT_TYPE (event) = type; GST_EVENT_TYPE (event) = type;
GST_EVENT_TIMESTAMP (event) = G_GINT64_CONSTANT (0); GST_EVENT_TIMESTAMP (event) = G_GINT64_CONSTANT (0);
GST_EVENT_SRC (event) = NULL; GST_EVENT_SRC (event) = NULL;
GST_DATA_FLAG_SET (GST_DATA (event), GST_EVENT_COMMON_FLAG_NEED_RESPONSE);
return event; return event;
} }

View file

@ -141,6 +141,18 @@ typedef struct
gint64 value; gint64 value;
} GstFormatValue; } GstFormatValue;
/* FIXME: 0.9 - having GstEventCommonFlag and GstEventFlag is unneeded duplication,
* but for now I have to because the GstEventFlag enum above is not suitable.
*/
typedef enum {
/* Indicates that a result flag is not required. This is most important
* when the event travels upstream through a queue */
GST_EVENT_COMMON_FLAG_NEED_RESPONSE = GST_DATA_FLAG_LAST,
/* padding value for future expansion */
GST_EVENT_COMMON_FLAG_LAST = GST_DATA_FLAG_LAST + 16
} GstEventCommonFlag;
#define GST_EVENT_SEEK_TYPE(event) (GST_EVENT(event)->event_data.seek.type) #define GST_EVENT_SEEK_TYPE(event) (GST_EVENT(event)->event_data.seek.type)
#define GST_EVENT_SEEK_FORMAT(event) (GST_EVENT_SEEK_TYPE(event) & GST_SEEK_FORMAT_MASK) #define GST_EVENT_SEEK_FORMAT(event) (GST_EVENT_SEEK_TYPE(event) & GST_SEEK_FORMAT_MASK)
#define GST_EVENT_SEEK_METHOD(event) (GST_EVENT_SEEK_TYPE(event) & GST_SEEK_METHOD_MASK) #define GST_EVENT_SEEK_METHOD(event) (GST_EVENT_SEEK_TYPE(event) & GST_SEEK_METHOD_MASK)

View file

@ -379,6 +379,7 @@ gst_queue_finalize (GObject * object)
GstQueueEventResponse *er = g_queue_pop_head (queue->events); GstQueueEventResponse *er = g_queue_pop_head (queue->events);
gst_event_unref (er->event); gst_event_unref (er->event);
g_free (er);
} }
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
g_mutex_free (queue->event_lock); g_mutex_free (queue->event_lock);
@ -508,6 +509,7 @@ gst_queue_handle_pending_events (GstQueue * queue)
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) { while (!g_queue_is_empty (queue->events)) {
GstQueueEventResponse *er; GstQueueEventResponse *er;
gboolean need_response;
er = g_queue_pop_head (queue->events); er = g_queue_pop_head (queue->events);
@ -522,9 +524,17 @@ gst_queue_handle_pending_events (GstQueue * queue)
break; break;
} }
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
need_response =
GST_DATA_FLAG_IS_SET (GST_DATA (er->event),
GST_EVENT_COMMON_FLAG_NEED_RESPONSE);
er->ret = gst_pad_event_default (queue->srcpad, er->event); er->ret = gst_pad_event_default (queue->srcpad, er->event);
er->handled = TRUE; if (need_response) {
g_cond_signal (queue->event_done); er->handled = TRUE;
g_cond_signal (queue->event_done);
} else {
g_free (er);
}
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
} }
@ -901,28 +911,42 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
{ {
GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res; gboolean res;
GstQueueEventResponse *er = NULL;
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
event, GST_EVENT_TYPE (event)); event, GST_EVENT_TYPE (event));
GST_QUEUE_MUTEX_LOCK; GST_QUEUE_MUTEX_LOCK;
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
GstQueueEventResponse er; gboolean need_response =
GST_DATA_FLAG_IS_SET (GST_DATA (event),
GST_EVENT_COMMON_FLAG_NEED_RESPONSE);
er = g_new (GstQueueEventResponse, 1);
/* push the event to the queue and wait for upstream consumption */ /* push the event to the queue and wait for upstream consumption */
er.event = event; er->event = event;
er.handled = FALSE; er->handled = FALSE;
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"putting event %p (%d) on internal queue", event, "putting event %p (%d) on internal queue", event,
GST_EVENT_TYPE (event)); GST_EVENT_TYPE (event));
g_queue_push_tail (queue->events, &er); g_queue_push_tail (queue->events, er);
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
if (!need_response) {
/* Leave for upstream to delete */
er = NULL;
res = TRUE;
goto handled;
}
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"Preparing for loop for event handler"); "Preparing for loop for event handler");
/* see the chain function on why this is here - it prevents a deadlock */ /* see the chain function on why this is here - it prevents a deadlock */
g_cond_signal (queue->item_del); g_cond_signal (queue->item_del);
while (!er.handled) { while (!er->handled) {
GTimeVal timeout; GTimeVal timeout;
g_get_current_time (&timeout); g_get_current_time (&timeout);
@ -930,10 +954,10 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ()); g_thread_self ());
if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
!er.handled) { !er->handled) {
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"timeout in upstream event handling, dropping event %p (%d)", "timeout in upstream event handling, dropping event %p (%d)",
er.event, GST_EVENT_TYPE (er.event)); er->event, GST_EVENT_TYPE (er->event));
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
/* since this queue is for src events (ie upstream), this thread is /* since this queue is for src events (ie upstream), this thread is
* the only one that is pushing stuff on it, so we're sure that * the only one that is pushing stuff on it, so we're sure that
@ -942,13 +966,13 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
* the list. */ * the list. */
g_queue_pop_tail (queue->events); g_queue_pop_tail (queue->events);
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
gst_event_unref (er.event); gst_event_unref (er->event);
res = FALSE; res = FALSE;
goto handled; goto handled;
} }
} }
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled"); GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
res = er.ret; res = er->ret;
} else { } else {
res = gst_pad_event_default (pad, event); res = gst_pad_event_default (pad, event);
@ -970,6 +994,9 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
handled: handled:
GST_QUEUE_MUTEX_UNLOCK; GST_QUEUE_MUTEX_UNLOCK;
if (er)
g_free (er);
return res; return res;
} }

View file

@ -379,6 +379,7 @@ gst_queue_finalize (GObject * object)
GstQueueEventResponse *er = g_queue_pop_head (queue->events); GstQueueEventResponse *er = g_queue_pop_head (queue->events);
gst_event_unref (er->event); gst_event_unref (er->event);
g_free (er);
} }
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
g_mutex_free (queue->event_lock); g_mutex_free (queue->event_lock);
@ -508,6 +509,7 @@ gst_queue_handle_pending_events (GstQueue * queue)
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
while (!g_queue_is_empty (queue->events)) { while (!g_queue_is_empty (queue->events)) {
GstQueueEventResponse *er; GstQueueEventResponse *er;
gboolean need_response;
er = g_queue_pop_head (queue->events); er = g_queue_pop_head (queue->events);
@ -522,9 +524,17 @@ gst_queue_handle_pending_events (GstQueue * queue)
break; break;
} }
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
need_response =
GST_DATA_FLAG_IS_SET (GST_DATA (er->event),
GST_EVENT_COMMON_FLAG_NEED_RESPONSE);
er->ret = gst_pad_event_default (queue->srcpad, er->event); er->ret = gst_pad_event_default (queue->srcpad, er->event);
er->handled = TRUE; if (need_response) {
g_cond_signal (queue->event_done); er->handled = TRUE;
g_cond_signal (queue->event_done);
} else {
g_free (er);
}
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent"); GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "event sent");
} }
@ -901,28 +911,42 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
{ {
GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad)); GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
gboolean res; gboolean res;
GstQueueEventResponse *er = NULL;
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)", GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
event, GST_EVENT_TYPE (event)); event, GST_EVENT_TYPE (event));
GST_QUEUE_MUTEX_LOCK; GST_QUEUE_MUTEX_LOCK;
if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) { if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
GstQueueEventResponse er; gboolean need_response =
GST_DATA_FLAG_IS_SET (GST_DATA (event),
GST_EVENT_COMMON_FLAG_NEED_RESPONSE);
er = g_new (GstQueueEventResponse, 1);
/* push the event to the queue and wait for upstream consumption */ /* push the event to the queue and wait for upstream consumption */
er.event = event; er->event = event;
er.handled = FALSE; er->handled = FALSE;
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
"putting event %p (%d) on internal queue", event, "putting event %p (%d) on internal queue", event,
GST_EVENT_TYPE (event)); GST_EVENT_TYPE (event));
g_queue_push_tail (queue->events, &er); g_queue_push_tail (queue->events, er);
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
if (!need_response) {
/* Leave for upstream to delete */
er = NULL;
res = TRUE;
goto handled;
}
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"Preparing for loop for event handler"); "Preparing for loop for event handler");
/* see the chain function on why this is here - it prevents a deadlock */ /* see the chain function on why this is here - it prevents a deadlock */
g_cond_signal (queue->item_del); g_cond_signal (queue->item_del);
while (!er.handled) { while (!er->handled) {
GTimeVal timeout; GTimeVal timeout;
g_get_current_time (&timeout); g_get_current_time (&timeout);
@ -930,10 +954,10 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p", GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ()); g_thread_self ());
if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) && if (!g_cond_timed_wait (queue->event_done, queue->qlock, &timeout) &&
!er.handled) { !er->handled) {
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, GST_CAT_WARNING_OBJECT (queue_dataflow, queue,
"timeout in upstream event handling, dropping event %p (%d)", "timeout in upstream event handling, dropping event %p (%d)",
er.event, GST_EVENT_TYPE (er.event)); er->event, GST_EVENT_TYPE (er->event));
g_mutex_lock (queue->event_lock); g_mutex_lock (queue->event_lock);
/* since this queue is for src events (ie upstream), this thread is /* since this queue is for src events (ie upstream), this thread is
* the only one that is pushing stuff on it, so we're sure that * the only one that is pushing stuff on it, so we're sure that
@ -942,13 +966,13 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
* the list. */ * the list. */
g_queue_pop_tail (queue->events); g_queue_pop_tail (queue->events);
g_mutex_unlock (queue->event_lock); g_mutex_unlock (queue->event_lock);
gst_event_unref (er.event); gst_event_unref (er->event);
res = FALSE; res = FALSE;
goto handled; goto handled;
} }
} }
GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled"); GST_CAT_WARNING_OBJECT (queue_dataflow, queue, "Event handled");
res = er.ret; res = er->ret;
} else { } else {
res = gst_pad_event_default (pad, event); res = gst_pad_event_default (pad, event);
@ -970,6 +994,9 @@ gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
handled: handled:
GST_QUEUE_MUTEX_UNLOCK; GST_QUEUE_MUTEX_UNLOCK;
if (er)
g_free (er);
return res; return res;
} }