rtpjitterbuffer: serialize events in the buffer

Serialize events into the jitterbuffer by inserting them with a -1
seqnum.
Update unit test to expect events from the streaming thread.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=652986
This commit is contained in:
Wim Taymans 2013-12-10 11:57:37 +01:00
parent 36e78bc5ca
commit eee515cb2c
2 changed files with 101 additions and 63 deletions

View file

@ -730,6 +730,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
#define ITEM_TYPE_BUFFER 0 #define ITEM_TYPE_BUFFER 0
#define ITEM_TYPE_LOST 1 #define ITEM_TYPE_LOST 1
#define ITEM_TYPE_EVENT 2
static RTPJitterBufferItem * static RTPJitterBufferItem *
alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
@ -1303,6 +1304,65 @@ gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
return ret; return ret;
} }
/* handles and stores the event in the jitterbuffer, must be called with
* LOCK */
static gboolean
queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
RTPJitterBufferItem *item;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
gst_event_parse_caps (event, &caps);
if (!gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps))
goto wrong_caps;
break;
}
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &priv->segment);
/* we need time for now */
if (priv->segment.format != GST_FORMAT_TIME)
goto newseg_wrong_format;
GST_DEBUG_OBJECT (jitterbuffer,
"newsegment: %" GST_SEGMENT_FORMAT, &priv->segment);
break;
case GST_EVENT_EOS:
priv->eos = TRUE;
break;
default:
break;
}
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
JBUF_SIGNAL_EVENT (priv);
return TRUE;
/* ERRORS */
wrong_caps:
{
GST_DEBUG_OBJECT (jitterbuffer, "received invalid caps");
gst_event_unref (event);
return FALSE;
}
newseg_wrong_format:
{
GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
gst_event_unref (event);
return FALSE;
}
}
static gboolean static gboolean
gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event) GstEvent * event)
@ -1317,38 +1377,6 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event)); GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
gst_event_parse_caps (event, &caps);
JBUF_LOCK (priv);
ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
JBUF_UNLOCK (priv);
/* set same caps on srcpad on success */
if (ret)
ret = gst_pad_push_event (priv->srcpad, event);
else
gst_event_unref (event);
break;
}
case GST_EVENT_SEGMENT:
{
gst_event_copy_segment (event, &priv->segment);
/* we need time for now */
if (priv->segment.format != GST_FORMAT_TIME)
goto newseg_wrong_format;
GST_DEBUG_OBJECT (jitterbuffer,
"newsegment: %" GST_SEGMENT_FORMAT, &priv->segment);
/* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
ret = gst_pad_push_event (priv->srcpad, event);
break;
}
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
ret = gst_pad_push_event (priv->srcpad, event); ret = gst_pad_push_event (priv->srcpad, event);
gst_rtp_jitter_buffer_flush_start (jitterbuffer); gst_rtp_jitter_buffer_flush_start (jitterbuffer);
@ -1361,43 +1389,49 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent, gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
GST_PAD_MODE_PUSH, TRUE); GST_PAD_MODE_PUSH, TRUE);
break; break;
case GST_EVENT_EOS:
{
/* push EOS in queue. We always push it at the head */
JBUF_LOCK (priv);
/* check for flushing, we need to discard the event and return FALSE when
* we are flushing */
ret = priv->srcresult == GST_FLOW_OK;
if (ret && !priv->eos) {
GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
priv->eos = TRUE;
JBUF_SIGNAL_EVENT (priv);
} else if (priv->eos) {
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
} else {
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
gst_flow_get_name (priv->srcresult));
}
JBUF_UNLOCK (priv);
gst_event_unref (event);
break;
}
default: default:
if (GST_EVENT_IS_SERIALIZED (event)) {
/* serialized events go in the queue */
JBUF_LOCK (priv);
if (priv->srcresult != GST_FLOW_OK) {
/* Errors in sticky event pushing are no problem and ignored here
* as they will cause more meaningful errors during data flow.
* For EOS events, that are not followed by data flow, we still
* return FALSE here though.
*/
if (!GST_EVENT_IS_STICKY (event) ||
GST_EVENT_TYPE (event) == GST_EVENT_EOS)
goto out_flow_error;
}
/* refuse more events on EOS */
if (priv->eos)
goto out_eos;
ret = queue_event (jitterbuffer, event);
JBUF_UNLOCK (priv);
} else {
/* non-serialized events are forwarded downstream immediately */
ret = gst_pad_push_event (priv->srcpad, event); ret = gst_pad_push_event (priv->srcpad, event);
}
break; break;
} }
done:
return ret; return ret;
/* ERRORS */ /* ERRORS */
newseg_wrong_format: out_flow_error:
{ {
GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); GST_DEBUG_OBJECT (jitterbuffer,
ret = FALSE; "refusing event, we have a downstream flow error: %s",
gst_flow_get_name (priv->srcresult));
JBUF_UNLOCK (priv);
gst_event_unref (event); gst_event_unref (event);
goto done; return FALSE;
}
out_eos:
{
GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
JBUF_UNLOCK (priv);
gst_event_unref (event);
return FALSE;
} }
} }

View file

@ -536,7 +536,11 @@ setup_testharness (TestData * data)
gst_pad_set_caps (data->test_src_pad, generate_caps ()); gst_pad_set_caps (data->test_src_pad, generate_caps ());
gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg)); gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
while ((obj = g_async_queue_try_pop (data->sink_event_queue))) obj = g_async_queue_pop (data->sink_event_queue);
gst_mini_object_unref (obj);
obj = g_async_queue_pop (data->sink_event_queue);
gst_mini_object_unref (obj);
obj = g_async_queue_pop (data->sink_event_queue);
gst_mini_object_unref (obj); gst_mini_object_unref (obj);
} }