diff --git a/ChangeLog b/ChangeLog index f2e45732d3..acd6b433ac 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,20 @@ +2005-01-08 Ronald S. Bultje + + * docs/gst/gstreamer-sections.txt: + * docs/gst/tmpl/gstevent.sgml: + * gst/gstevent.c: (gst_event_new_filler_stamped), + (gst_event_filler_get_duration): + * gst/gstevent.h: + Add two new functions for filler events (which are used to + synchronize streams if one of them is not having any data + for a while) without interrupting the actual data-stream. + Basically a no-op. + * gst/gstqueue.c: (gst_queue_init), (gst_queue_getcaps), + (gst_queue_link_sink), (gst_queue_link_src), + (gst_queue_change_state): + Allow for renegotiation while filled. Required for stream + switching while playing. + 2005-01-08 Benjamin Otte * gst/gstelement.c: (gst_element_link_many): diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index c05cf9c0bd..8d39e7cef2 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -598,6 +598,8 @@ gst_event_new_discontinuous gst_event_new_discontinuous_valist gst_event_discont_get_value gst_event_new_filler +gst_event_new_filler_stamped +gst_event_filler_get_duration gst_event_new_flush GST_EVENT diff --git a/docs/gst/tmpl/gstevent.sgml b/docs/gst/tmpl/gstevent.sgml index ccf836958e..5a6c1ad7e7 100644 --- a/docs/gst/tmpl/gstevent.sgml +++ b/docs/gst/tmpl/gstevent.sgml @@ -442,6 +442,25 @@ Create a new dummy event that should be ignored + + + + + +@time: +@duration: +@Returns: + + + + + + + +@event: +@Returns: + + Create a new flush event. diff --git a/gst/gstevent.c b/gst/gstevent.c index a9f6a65f7d..a6ff14c3a8 100644 --- a/gst/gstevent.c +++ b/gst/gstevent.c @@ -25,6 +25,7 @@ #include "gst_private.h" #include "gstdata_private.h" +#include "gstclock.h" #include "gstinfo.h" #include "gstmemchunk.h" #include "gstevent.h" @@ -353,3 +354,72 @@ gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 stop) return event; } + +/** + * gst_event_new_filler_stamped: + * @time: timestamp of the filler, in nanoseconds. + * @duration: duration of the filler, in nanoseconds. + * + * Creates "filler" data, which is basically empty data that is used to + * synchronize streams if one stream has no data for a while. This is + * used to prevent deadlocks. + * + * Returns: the newly created event. + */ + +GstEvent * +gst_event_new_filler_stamped (guint64 time, guint64 duration) +{ + GstEvent *event = gst_event_new_filler (); + + GST_EVENT_TIMESTAMP (event) = time; + if (GST_CLOCK_TIME_IS_VALID (duration)) { + GValue value = { 0 }; + + event->event_data.structure.structure = + gst_structure_new ("application/x-gst-filler", NULL); + g_value_init (&value, G_TYPE_UINT64); + g_value_set_uint64 (&value, duration); + gst_structure_set_value (event->event_data.structure.structure, + "duration", &value); + g_value_unset (&value); + } + + return event; +} + +/** + * gst_event_filler_get_duration: + * @event: the event to get the duration from. + * + * Filler events are used to synchronize streams (and thereby prevent + * application deadlocks) if one stream receives no data for a while. + * This function gets the duration of a filler event, which is the + * amount of time from the start of this event (see GST_EVENT_TIMESTAMP()) + * that no data is available. + * + * Returns: duration of the lack of data, or GST_CLOCK_TIME_NONE. + */ + +guint64 +gst_event_filler_get_duration (GstEvent * event) +{ + const GValue *value; + + g_return_val_if_fail (event != NULL, GST_CLOCK_TIME_NONE); + g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_FILLER, + GST_CLOCK_TIME_NONE); + + /* check the event */ + if (!event->event_data.structure.structure) + return GST_CLOCK_TIME_NONE; + value = gst_structure_get_value (event->event_data.structure.structure, + "duration"); + if (!value) + return GST_CLOCK_TIME_NONE; + g_return_val_if_fail (G_VALUE_TYPE (value) == G_TYPE_UINT64, + GST_CLOCK_TIME_NONE); + + /* return */ + return g_value_get_uint64 (value); +} diff --git a/gst/gstevent.h b/gst/gstevent.h index c8e4b5a7ba..d10b7f13a3 100644 --- a/gst/gstevent.h +++ b/gst/gstevent.h @@ -224,6 +224,9 @@ GstEvent* gst_event_new_discontinuous_valist (gboolean new_media, gboolean gst_event_discont_get_value (GstEvent *event, GstFormat format, gint64 *value); #define gst_event_new_filler() gst_event_new(GST_EVENT_FILLER) +GstEvent* gst_event_new_filler_stamped (guint64 time, + guint64 duration); +guint64 gst_event_filler_get_duration (GstEvent *event); /* flush events */ #define gst_event_new_flush() gst_event_new(GST_EVENT_FLUSH) diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 17ee2ccc11..11662bd4a7 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS_ANY); GST_DEBUG_CATEGORY_STATIC (queue_dataflow); +#define GST_CAT_DEFAULT (queue_dataflow) + +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_threshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_threshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_threshold.time, \ + queue->max_size.time, \ + queue->queue->length) static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", "Generic", @@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQueryType type, GstFormat * fmt, gint64 * value); static GstCaps *gst_queue_getcaps (GstPad * pad); -static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn +gst_queue_link_sink (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps); static void gst_queue_locked_flush (GstQueue * queue); static GstElementStateReturn gst_queue_change_state (GstElement * element); @@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue) GST_DEBUG_FUNCPTR (gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); gst_pad_set_link_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_link)); + GST_DEBUG_FUNCPTR (gst_queue_link_sink)); gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_active (queue->sinkpad, TRUE); @@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue) "src"); gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); - gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link)); + gst_pad_set_link_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_link_src)); gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_event_function (queue->srcpad, @@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad) queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (queue->cur_level.bytes > 0) { + if (pad == queue->srcpad && queue->cur_level.bytes > 0) { return gst_caps_copy (queue->negotiated_caps); } @@ -382,7 +403,50 @@ gst_queue_getcaps (GstPad * pad) } static GstPadLinkReturn -gst_queue_link (GstPad * pad, const GstCaps * caps) +gst_queue_link_sink (GstPad * pad, const GstCaps * caps) +{ + GstQueue *queue; + GstPadLinkReturn link_ret; + + queue = GST_QUEUE (gst_pad_get_parent (pad)); + + if (queue->cur_level.bytes > 0) { + if (gst_caps_is_equal (caps, queue->negotiated_caps)) { + return GST_PAD_LINK_OK; + } else if (GST_STATE (queue) != GST_STATE_PLAYING) { + return GST_PAD_LINK_DELAYED; + } + + /* Wait until the queue is empty before attempting the pad + negotiation. */ + GST_QUEUE_MUTEX_LOCK; + + STATUS (queue, "waiting for queue to get empty"); + while (queue->cur_level.bytes > 0) { + g_cond_wait (queue->item_del, queue->qlock); + if (queue->interrupt) { + GST_QUEUE_MUTEX_UNLOCK; + return GST_PAD_LINK_DELAYED; + } + } + STATUS (queue, "queue is now empty"); + + GST_QUEUE_MUTEX_UNLOCK; + } + + link_ret = gst_pad_proxy_pad_link (pad, caps); + + if (GST_PAD_LINK_SUCCESSFUL (link_ret)) { + /* we store an extra copy of the negotiated caps, just in case + * the pads become unnegotiated while we have buffers */ + gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps)); + } + + return link_ret; +} + +static GstPadLinkReturn +gst_queue_link_src (GstPad * pad, const GstCaps * caps) { GstQueue *queue; GstPadLinkReturn link_ret; @@ -465,23 +529,6 @@ gst_queue_handle_pending_events (GstQueue * queue) g_mutex_unlock (queue->event_lock); } -#define STATUS(queue, msg) \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ - "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ - "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ - "-%" G_GUINT64_FORMAT " ns, %u elements", \ - GST_DEBUG_PAD_NAME (pad), \ - queue->cur_level.buffers, \ - queue->min_threshold.buffers, \ - queue->max_size.buffers, \ - queue->cur_level.bytes, \ - queue->min_threshold.bytes, \ - queue->max_size.bytes, \ - queue->cur_level.time, \ - queue->min_threshold.time, \ - queue->max_size.time, \ - queue->queue->length) - static void gst_queue_chain (GstPad * pad, GstData * data) { @@ -961,7 +1008,8 @@ gst_queue_change_state (GstElement * element) queue = GST_QUEUE (element); - GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change"); + GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, + "starting state change 0x%x", GST_STATE_TRANSITION (element)); /* lock the queue so another thread (not in sync with this thread's state) * can't call this queue's _get (or whatever) @@ -1009,6 +1057,8 @@ gst_queue_change_state (GstElement * element) break; } + GST_QUEUE_MUTEX_UNLOCK; + if (GST_ELEMENT_CLASS (parent_class)->change_state) ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); @@ -1018,6 +1068,10 @@ gst_queue_change_state (GstElement * element) gst_pad_set_active (queue->sinkpad, TRUE); gst_pad_set_active (queue->srcpad, TRUE); + GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); + + return ret; + unlock: GST_QUEUE_MUTEX_UNLOCK; diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 17ee2ccc11..11662bd4a7 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS_ANY); GST_DEBUG_CATEGORY_STATIC (queue_dataflow); +#define GST_CAT_DEFAULT (queue_dataflow) + +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_threshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_threshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_threshold.time, \ + queue->max_size.time, \ + queue->queue->length) static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", "Generic", @@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQueryType type, GstFormat * fmt, gint64 * value); static GstCaps *gst_queue_getcaps (GstPad * pad); -static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn +gst_queue_link_sink (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps); static void gst_queue_locked_flush (GstQueue * queue); static GstElementStateReturn gst_queue_change_state (GstElement * element); @@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue) GST_DEBUG_FUNCPTR (gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); gst_pad_set_link_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_link)); + GST_DEBUG_FUNCPTR (gst_queue_link_sink)); gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_active (queue->sinkpad, TRUE); @@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue) "src"); gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); - gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link)); + gst_pad_set_link_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_link_src)); gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_event_function (queue->srcpad, @@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad) queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (queue->cur_level.bytes > 0) { + if (pad == queue->srcpad && queue->cur_level.bytes > 0) { return gst_caps_copy (queue->negotiated_caps); } @@ -382,7 +403,50 @@ gst_queue_getcaps (GstPad * pad) } static GstPadLinkReturn -gst_queue_link (GstPad * pad, const GstCaps * caps) +gst_queue_link_sink (GstPad * pad, const GstCaps * caps) +{ + GstQueue *queue; + GstPadLinkReturn link_ret; + + queue = GST_QUEUE (gst_pad_get_parent (pad)); + + if (queue->cur_level.bytes > 0) { + if (gst_caps_is_equal (caps, queue->negotiated_caps)) { + return GST_PAD_LINK_OK; + } else if (GST_STATE (queue) != GST_STATE_PLAYING) { + return GST_PAD_LINK_DELAYED; + } + + /* Wait until the queue is empty before attempting the pad + negotiation. */ + GST_QUEUE_MUTEX_LOCK; + + STATUS (queue, "waiting for queue to get empty"); + while (queue->cur_level.bytes > 0) { + g_cond_wait (queue->item_del, queue->qlock); + if (queue->interrupt) { + GST_QUEUE_MUTEX_UNLOCK; + return GST_PAD_LINK_DELAYED; + } + } + STATUS (queue, "queue is now empty"); + + GST_QUEUE_MUTEX_UNLOCK; + } + + link_ret = gst_pad_proxy_pad_link (pad, caps); + + if (GST_PAD_LINK_SUCCESSFUL (link_ret)) { + /* we store an extra copy of the negotiated caps, just in case + * the pads become unnegotiated while we have buffers */ + gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps)); + } + + return link_ret; +} + +static GstPadLinkReturn +gst_queue_link_src (GstPad * pad, const GstCaps * caps) { GstQueue *queue; GstPadLinkReturn link_ret; @@ -465,23 +529,6 @@ gst_queue_handle_pending_events (GstQueue * queue) g_mutex_unlock (queue->event_lock); } -#define STATUS(queue, msg) \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ - "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ - "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ - "-%" G_GUINT64_FORMAT " ns, %u elements", \ - GST_DEBUG_PAD_NAME (pad), \ - queue->cur_level.buffers, \ - queue->min_threshold.buffers, \ - queue->max_size.buffers, \ - queue->cur_level.bytes, \ - queue->min_threshold.bytes, \ - queue->max_size.bytes, \ - queue->cur_level.time, \ - queue->min_threshold.time, \ - queue->max_size.time, \ - queue->queue->length) - static void gst_queue_chain (GstPad * pad, GstData * data) { @@ -961,7 +1008,8 @@ gst_queue_change_state (GstElement * element) queue = GST_QUEUE (element); - GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change"); + GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, + "starting state change 0x%x", GST_STATE_TRANSITION (element)); /* lock the queue so another thread (not in sync with this thread's state) * can't call this queue's _get (or whatever) @@ -1009,6 +1057,8 @@ gst_queue_change_state (GstElement * element) break; } + GST_QUEUE_MUTEX_UNLOCK; + if (GST_ELEMENT_CLASS (parent_class)->change_state) ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); @@ -1018,6 +1068,10 @@ gst_queue_change_state (GstElement * element) gst_pad_set_active (queue->sinkpad, TRUE); gst_pad_set_active (queue->srcpad, TRUE); + GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change"); + + return ret; + unlock: GST_QUEUE_MUTEX_UNLOCK;