diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 1500120a67..464bb09ddd 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -730,6 +730,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) #define ITEM_TYPE_BUFFER 0 #define ITEM_TYPE_LOST 1 +#define ITEM_TYPE_EVENT 2 static RTPJitterBufferItem * alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, @@ -1303,6 +1304,65 @@ gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent, return ret; } +/* handles and stores the event in the jitterbuffer, must be called with + * LOCK */ +static gboolean +queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + RTPJitterBufferItem *item; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS: + { + GstCaps *caps; + + gst_event_parse_caps (event, &caps); + if (!gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps)) + goto wrong_caps; + + break; + } + case GST_EVENT_SEGMENT: + gst_event_copy_segment (event, &priv->segment); + + /* we need time for now */ + if (priv->segment.format != GST_FORMAT_TIME) + goto newseg_wrong_format; + + GST_DEBUG_OBJECT (jitterbuffer, + "newsegment: %" GST_SEGMENT_FORMAT, &priv->segment); + break; + case GST_EVENT_EOS: + priv->eos = TRUE; + break; + default: + break; + } + + + GST_DEBUG_OBJECT (jitterbuffer, "adding event"); + item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); + rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL); + JBUF_SIGNAL_EVENT (priv); + + return TRUE; + + /* ERRORS */ +wrong_caps: + { + GST_DEBUG_OBJECT (jitterbuffer, "received invalid caps"); + gst_event_unref (event); + return FALSE; + } +newseg_wrong_format: + { + GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); + gst_event_unref (event); + return FALSE; + } +} + static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) @@ -1317,38 +1377,6 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_CAPS: - { - GstCaps *caps; - - gst_event_parse_caps (event, &caps); - - JBUF_LOCK (priv); - ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps); - JBUF_UNLOCK (priv); - - /* set same caps on srcpad on success */ - if (ret) - ret = gst_pad_push_event (priv->srcpad, event); - else - gst_event_unref (event); - break; - } - case GST_EVENT_SEGMENT: - { - gst_event_copy_segment (event, &priv->segment); - - /* we need time for now */ - if (priv->segment.format != GST_FORMAT_TIME) - goto newseg_wrong_format; - - GST_DEBUG_OBJECT (jitterbuffer, - "newsegment: %" GST_SEGMENT_FORMAT, &priv->segment); - - /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */ - ret = gst_pad_push_event (priv->srcpad, event); - break; - } case GST_EVENT_FLUSH_START: ret = gst_pad_push_event (priv->srcpad, event); gst_rtp_jitter_buffer_flush_start (jitterbuffer); @@ -1361,43 +1389,49 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent, GST_PAD_MODE_PUSH, TRUE); break; - case GST_EVENT_EOS: - { - /* push EOS in queue. We always push it at the head */ - JBUF_LOCK (priv); - /* check for flushing, we need to discard the event and return FALSE when - * we are flushing */ - ret = priv->srcresult == GST_FLOW_OK; - if (ret && !priv->eos) { - GST_INFO_OBJECT (jitterbuffer, "queuing EOS"); - priv->eos = TRUE; - JBUF_SIGNAL_EVENT (priv); - } else if (priv->eos) { - GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS"); - } else { - GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s", - gst_flow_get_name (priv->srcresult)); - } - JBUF_UNLOCK (priv); - gst_event_unref (event); - break; - } default: - ret = gst_pad_push_event (priv->srcpad, event); + if (GST_EVENT_IS_SERIALIZED (event)) { + /* serialized events go in the queue */ + JBUF_LOCK (priv); + if (priv->srcresult != GST_FLOW_OK) { + /* Errors in sticky event pushing are no problem and ignored here + * as they will cause more meaningful errors during data flow. + * For EOS events, that are not followed by data flow, we still + * return FALSE here though. + */ + if (!GST_EVENT_IS_STICKY (event) || + GST_EVENT_TYPE (event) == GST_EVENT_EOS) + goto out_flow_error; + } + /* refuse more events on EOS */ + if (priv->eos) + goto out_eos; + ret = queue_event (jitterbuffer, event); + JBUF_UNLOCK (priv); + } else { + /* non-serialized events are forwarded downstream immediately */ + ret = gst_pad_push_event (priv->srcpad, event); + } break; } - -done: - return ret; /* ERRORS */ -newseg_wrong_format: +out_flow_error: { - GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); - ret = FALSE; + GST_DEBUG_OBJECT (jitterbuffer, + "refusing event, we have a downstream flow error: %s", + gst_flow_get_name (priv->srcresult)); + JBUF_UNLOCK (priv); gst_event_unref (event); - goto done; + return FALSE; + } +out_eos: + { + GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS"); + JBUF_UNLOCK (priv); + gst_event_unref (event); + return FALSE; } } diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c index edda166e6d..2169718c95 100644 --- a/tests/check/elements/rtpjitterbuffer.c +++ b/tests/check/elements/rtpjitterbuffer.c @@ -536,8 +536,12 @@ setup_testharness (TestData * data) gst_pad_set_caps (data->test_src_pad, generate_caps ()); gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg)); - while ((obj = g_async_queue_try_pop (data->sink_event_queue))) - gst_mini_object_unref (obj); + obj = g_async_queue_pop (data->sink_event_queue); + gst_mini_object_unref (obj); + obj = g_async_queue_pop (data->sink_event_queue); + gst_mini_object_unref (obj); + obj = g_async_queue_pop (data->sink_event_queue); + gst_mini_object_unref (obj); } static void