Add two new functions for filler events (which are used to synchronize streams if one of them is not having any data ...

Original commit message from CVS:
* docs/gst/gstreamer-sections.txt:
* docs/gst/tmpl/gstevent.sgml:
* gst/gstevent.c: (gst_event_new_filler_stamped),
(gst_event_filler_get_duration):
* gst/gstevent.h:
Add two new functions for filler events (which are used to
synchronize streams if one of them is not having any data
for a while) without interrupting the actual data-stream.
Basically a no-op.
* gst/gstqueue.c: (gst_queue_init), (gst_queue_getcaps),
(gst_queue_link_sink), (gst_queue_link_src),
(gst_queue_change_state):
Allow for renegotiation while filled. Required for stream
switching while playing.
This commit is contained in:
Ronald S. Bultje 2005-01-08 18:10:50 +00:00
parent ba4b71e86a
commit 66d5da4f17
7 changed files with 265 additions and 46 deletions

View file

@ -1,3 +1,20 @@
2005-01-08 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* docs/gst/gstreamer-sections.txt:
* docs/gst/tmpl/gstevent.sgml:
* gst/gstevent.c: (gst_event_new_filler_stamped),
(gst_event_filler_get_duration):
* gst/gstevent.h:
Add two new functions for filler events (which are used to
synchronize streams if one of them is not having any data
for a while) without interrupting the actual data-stream.
Basically a no-op.
* gst/gstqueue.c: (gst_queue_init), (gst_queue_getcaps),
(gst_queue_link_sink), (gst_queue_link_src),
(gst_queue_change_state):
Allow for renegotiation while filled. Required for stream
switching while playing.
2005-01-08 Benjamin Otte <otte@gnome.org>
* gst/gstelement.c: (gst_element_link_many):

View file

@ -598,6 +598,8 @@ gst_event_new_discontinuous
gst_event_new_discontinuous_valist
gst_event_discont_get_value
gst_event_new_filler
gst_event_new_filler_stamped
gst_event_filler_get_duration
gst_event_new_flush
<SUBSECTION Standard>
GST_EVENT

View file

@ -442,6 +442,25 @@ Create a new dummy event that should be ignored
<!-- ##### FUNCTION gst_event_new_filler_stamped ##### -->
<para>
</para>
@time:
@duration:
@Returns:
<!-- ##### FUNCTION gst_event_filler_get_duration ##### -->
<para>
</para>
@event:
@Returns:
<!-- ##### MACRO gst_event_new_flush ##### -->
<para>
Create a new flush event.

View file

@ -25,6 +25,7 @@
#include "gst_private.h"
#include "gstdata_private.h"
#include "gstclock.h"
#include "gstinfo.h"
#include "gstmemchunk.h"
#include "gstevent.h"
@ -353,3 +354,72 @@ gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 stop)
return event;
}
/**
* gst_event_new_filler_stamped:
* @time: timestamp of the filler, in nanoseconds.
* @duration: duration of the filler, in nanoseconds.
*
* Creates "filler" data, which is basically empty data that is used to
* synchronize streams if one stream has no data for a while. This is
* used to prevent deadlocks.
*
* Returns: the newly created event.
*/
GstEvent *
gst_event_new_filler_stamped (guint64 time, guint64 duration)
{
GstEvent *event = gst_event_new_filler ();
GST_EVENT_TIMESTAMP (event) = time;
if (GST_CLOCK_TIME_IS_VALID (duration)) {
GValue value = { 0 };
event->event_data.structure.structure =
gst_structure_new ("application/x-gst-filler", NULL);
g_value_init (&value, G_TYPE_UINT64);
g_value_set_uint64 (&value, duration);
gst_structure_set_value (event->event_data.structure.structure,
"duration", &value);
g_value_unset (&value);
}
return event;
}
/**
* gst_event_filler_get_duration:
* @event: the event to get the duration from.
*
* Filler events are used to synchronize streams (and thereby prevent
* application deadlocks) if one stream receives no data for a while.
* This function gets the duration of a filler event, which is the
* amount of time from the start of this event (see GST_EVENT_TIMESTAMP())
* that no data is available.
*
* Returns: duration of the lack of data, or GST_CLOCK_TIME_NONE.
*/
guint64
gst_event_filler_get_duration (GstEvent * event)
{
const GValue *value;
g_return_val_if_fail (event != NULL, GST_CLOCK_TIME_NONE);
g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_FILLER,
GST_CLOCK_TIME_NONE);
/* check the event */
if (!event->event_data.structure.structure)
return GST_CLOCK_TIME_NONE;
value = gst_structure_get_value (event->event_data.structure.structure,
"duration");
if (!value)
return GST_CLOCK_TIME_NONE;
g_return_val_if_fail (G_VALUE_TYPE (value) == G_TYPE_UINT64,
GST_CLOCK_TIME_NONE);
/* return */
return g_value_get_uint64 (value);
}

View file

@ -224,6 +224,9 @@ GstEvent* gst_event_new_discontinuous_valist (gboolean new_media,
gboolean gst_event_discont_get_value (GstEvent *event, GstFormat format, gint64 *value);
#define gst_event_new_filler() gst_event_new(GST_EVENT_FILLER)
GstEvent* gst_event_new_filler_stamped (guint64 time,
guint64 duration);
guint64 gst_event_filler_get_duration (GstEvent *event);
/* flush events */
#define gst_event_new_flush() gst_event_new(GST_EVENT_FLUSH)

View file

@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
#define GST_CAT_DEFAULT (queue_dataflow)
#define STATUS(queue, msg) \
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
"bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
"-%" G_GUINT64_FORMAT " ns, %u elements", \
GST_DEBUG_PAD_NAME (pad), \
queue->cur_level.buffers, \
queue->min_threshold.buffers, \
queue->max_size.buffers, \
queue->cur_level.bytes, \
queue->min_threshold.bytes, \
queue->max_size.bytes, \
queue->cur_level.time, \
queue->min_threshold.time, \
queue->max_size.time, \
queue->queue->length)
static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
"Generic",
@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad,
GstQueryType type, GstFormat * fmt, gint64 * value);
static GstCaps *gst_queue_getcaps (GstPad * pad);
static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
static GstPadLinkReturn
gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
static void gst_queue_locked_flush (GstQueue * queue);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue)
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_link_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_link));
GST_DEBUG_FUNCPTR (gst_queue_link_sink));
gst_pad_set_getcaps_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_active (queue->sinkpad, TRUE);
@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue)
"src");
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
gst_pad_set_link_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_link_src));
gst_pad_set_getcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_event_function (queue->srcpad,
@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad)
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (queue->cur_level.bytes > 0) {
if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
return gst_caps_copy (queue->negotiated_caps);
}
@ -382,7 +403,50 @@ gst_queue_getcaps (GstPad * pad)
}
static GstPadLinkReturn
gst_queue_link (GstPad * pad, const GstCaps * caps)
gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (queue->cur_level.bytes > 0) {
if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
return GST_PAD_LINK_OK;
} else if (GST_STATE (queue) != GST_STATE_PLAYING) {
return GST_PAD_LINK_DELAYED;
}
/* Wait until the queue is empty before attempting the pad
negotiation. */
GST_QUEUE_MUTEX_LOCK;
STATUS (queue, "waiting for queue to get empty");
while (queue->cur_level.bytes > 0) {
g_cond_wait (queue->item_del, queue->qlock);
if (queue->interrupt) {
GST_QUEUE_MUTEX_UNLOCK;
return GST_PAD_LINK_DELAYED;
}
}
STATUS (queue, "queue is now empty");
GST_QUEUE_MUTEX_UNLOCK;
}
link_ret = gst_pad_proxy_pad_link (pad, caps);
if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
/* we store an extra copy of the negotiated caps, just in case
* the pads become unnegotiated while we have buffers */
gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
}
return link_ret;
}
static GstPadLinkReturn
gst_queue_link_src (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
@ -465,23 +529,6 @@ gst_queue_handle_pending_events (GstQueue * queue)
g_mutex_unlock (queue->event_lock);
}
#define STATUS(queue, msg) \
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
"bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
"-%" G_GUINT64_FORMAT " ns, %u elements", \
GST_DEBUG_PAD_NAME (pad), \
queue->cur_level.buffers, \
queue->min_threshold.buffers, \
queue->max_size.buffers, \
queue->cur_level.bytes, \
queue->min_threshold.bytes, \
queue->max_size.bytes, \
queue->cur_level.time, \
queue->min_threshold.time, \
queue->max_size.time, \
queue->queue->length)
static void
gst_queue_chain (GstPad * pad, GstData * data)
{
@ -961,7 +1008,8 @@ gst_queue_change_state (GstElement * element)
queue = GST_QUEUE (element);
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element,
"starting state change 0x%x", GST_STATE_TRANSITION (element));
/* lock the queue so another thread (not in sync with this thread's state)
* can't call this queue's _get (or whatever)
@ -1009,6 +1057,8 @@ gst_queue_change_state (GstElement * element)
break;
}
GST_QUEUE_MUTEX_UNLOCK;
if (GST_ELEMENT_CLASS (parent_class)->change_state)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
@ -1018,6 +1068,10 @@ gst_queue_change_state (GstElement * element)
gst_pad_set_active (queue->sinkpad, TRUE);
gst_pad_set_active (queue->srcpad, TRUE);
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
return ret;
unlock:
GST_QUEUE_MUTEX_UNLOCK;

View file

@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
#define GST_CAT_DEFAULT (queue_dataflow)
#define STATUS(queue, msg) \
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
"bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
"-%" G_GUINT64_FORMAT " ns, %u elements", \
GST_DEBUG_PAD_NAME (pad), \
queue->cur_level.buffers, \
queue->min_threshold.buffers, \
queue->max_size.buffers, \
queue->cur_level.bytes, \
queue->min_threshold.bytes, \
queue->max_size.bytes, \
queue->cur_level.time, \
queue->min_threshold.time, \
queue->max_size.time, \
queue->queue->length)
static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
"Generic",
@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad,
GstQueryType type, GstFormat * fmt, gint64 * value);
static GstCaps *gst_queue_getcaps (GstPad * pad);
static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
static GstPadLinkReturn
gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
static void gst_queue_locked_flush (GstQueue * queue);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue)
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_link_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_link));
GST_DEBUG_FUNCPTR (gst_queue_link_sink));
gst_pad_set_getcaps_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_active (queue->sinkpad, TRUE);
@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue)
"src");
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
gst_pad_set_link_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_link_src));
gst_pad_set_getcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_event_function (queue->srcpad,
@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad)
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (queue->cur_level.bytes > 0) {
if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
return gst_caps_copy (queue->negotiated_caps);
}
@ -382,7 +403,50 @@ gst_queue_getcaps (GstPad * pad)
}
static GstPadLinkReturn
gst_queue_link (GstPad * pad, const GstCaps * caps)
gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
queue = GST_QUEUE (gst_pad_get_parent (pad));
if (queue->cur_level.bytes > 0) {
if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
return GST_PAD_LINK_OK;
} else if (GST_STATE (queue) != GST_STATE_PLAYING) {
return GST_PAD_LINK_DELAYED;
}
/* Wait until the queue is empty before attempting the pad
negotiation. */
GST_QUEUE_MUTEX_LOCK;
STATUS (queue, "waiting for queue to get empty");
while (queue->cur_level.bytes > 0) {
g_cond_wait (queue->item_del, queue->qlock);
if (queue->interrupt) {
GST_QUEUE_MUTEX_UNLOCK;
return GST_PAD_LINK_DELAYED;
}
}
STATUS (queue, "queue is now empty");
GST_QUEUE_MUTEX_UNLOCK;
}
link_ret = gst_pad_proxy_pad_link (pad, caps);
if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
/* we store an extra copy of the negotiated caps, just in case
* the pads become unnegotiated while we have buffers */
gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
}
return link_ret;
}
static GstPadLinkReturn
gst_queue_link_src (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
@ -465,23 +529,6 @@ gst_queue_handle_pending_events (GstQueue * queue)
g_mutex_unlock (queue->event_lock);
}
#define STATUS(queue, msg) \
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
"(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
"bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
"-%" G_GUINT64_FORMAT " ns, %u elements", \
GST_DEBUG_PAD_NAME (pad), \
queue->cur_level.buffers, \
queue->min_threshold.buffers, \
queue->max_size.buffers, \
queue->cur_level.bytes, \
queue->min_threshold.bytes, \
queue->max_size.bytes, \
queue->cur_level.time, \
queue->min_threshold.time, \
queue->max_size.time, \
queue->queue->length)
static void
gst_queue_chain (GstPad * pad, GstData * data)
{
@ -961,7 +1008,8 @@ gst_queue_change_state (GstElement * element)
queue = GST_QUEUE (element);
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element,
"starting state change 0x%x", GST_STATE_TRANSITION (element));
/* lock the queue so another thread (not in sync with this thread's state)
* can't call this queue's _get (or whatever)
@ -1009,6 +1057,8 @@ gst_queue_change_state (GstElement * element)
break;
}
GST_QUEUE_MUTEX_UNLOCK;
if (GST_ELEMENT_CLASS (parent_class)->change_state)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
@ -1018,6 +1068,10 @@ gst_queue_change_state (GstElement * element)
gst_pad_set_active (queue->sinkpad, TRUE);
gst_pad_set_active (queue->srcpad, TRUE);
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
return ret;
unlock:
GST_QUEUE_MUTEX_UNLOCK;