diff --git a/plugins/elements/gstinputselector.c b/plugins/elements/gstinputselector.c index 62a5aaa16a..9802b4ddc1 100644 --- a/plugins/elements/gstinputselector.c +++ b/plugins/elements/gstinputselector.c @@ -61,9 +61,47 @@ #include "gst/glib-compat-private.h" +#define DEBUG_CACHED_BUFFERS 0 + GST_DEBUG_CATEGORY_STATIC (input_selector_debug); #define GST_CAT_DEFAULT input_selector_debug +#define GST_TYPE_INPUT_SELECTOR_SYNC_MODE (gst_input_selector_sync_mode_get_type()) +static GType +gst_input_selector_sync_mode_get_type (void) +{ + static GType type = 0; + static const GEnumValue data[] = { + {GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT, + "Sync using the current active segment", + "active-segment"}, + {GST_INPUT_SELECTOR_SYNC_MODE_CLOCK, "Sync using the clock", "clock"}, + {0, NULL, NULL}, + }; + + if (!type) { + type = g_enum_register_static ("GstInputSelectorSyncMode", data); + } + return type; +} + +#if GLIB_CHECK_VERSION(2, 26, 0) +#define NOTIFY_MUTEX_LOCK() +#define NOTIFY_MUTEX_UNLOCK() +#else +static GStaticRecMutex notify_mutex = G_STATIC_REC_MUTEX_INIT; +#define NOTIFY_MUTEX_LOCK() g_static_rec_mutex_lock (¬ify_mutex) +#define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (¬ify_mutex) +#endif + +#define GST_INPUT_SELECTOR_GET_LOCK(sel) (&((GstInputSelector*)(sel))->lock) +#define GST_INPUT_SELECTOR_GET_COND(sel) (&((GstInputSelector*)(sel))->cond) +#define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel))) +#define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel))) +#define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \ + GST_INPUT_SELECTOR_GET_LOCK(sel))) +#define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel))) + static GstStaticPadTemplate gst_input_selector_sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, @@ -81,11 +119,14 @@ enum PROP_0, PROP_N_PADS, PROP_ACTIVE_PAD, - PROP_SYNC_STREAMS + PROP_SYNC_STREAMS, + PROP_SYNC_MODE, + PROP_CACHE_BUFFERS }; #define DEFAULT_SYNC_STREAMS TRUE - +#define DEFAULT_SYNC_MODE GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT +#define DEFAULT_CACHE_BUFFERS FALSE #define DEFAULT_PAD_ALWAYS_OK TRUE enum @@ -106,6 +147,8 @@ enum }; static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 }; +static void gst_input_selector_active_pad_changed (GstInputSelector * sel, + GParamSpec * pspec, gpointer user_data); static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector * sel, GstPad * pad); static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel, @@ -128,6 +171,7 @@ static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel, typedef struct _GstSelectorPad GstSelectorPad; typedef struct _GstSelectorPadClass GstSelectorPadClass; +typedef struct _GstSelectorPadCachedBuffer GstSelectorPadCachedBuffer; struct _GstSelectorPad { @@ -147,6 +191,15 @@ struct _GstSelectorPad guint32 segment_seqnum; /* sequence number of the current segment */ gboolean events_pending; /* TRUE if sticky events need to be updated */ + + gboolean sending_cached_buffers; + GQueue *cached_buffers; +}; + +struct _GstSelectorPadCachedBuffer +{ + GstBuffer *buffer; + GstSegment segment; }; struct _GstSelectorPadClass @@ -171,6 +224,9 @@ static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad, GstObject * parent); static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf); +static void gst_selector_pad_cache_buffer (GstSelectorPad * selpad, + GstBuffer * buffer); +static void gst_selector_pad_free_cached_buffers (GstSelectorPad * selpad); G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD); @@ -221,6 +277,7 @@ gst_selector_pad_finalize (GObject * object) if (pad->tags) gst_tag_list_unref (pad->tags); + gst_selector_pad_free_cached_buffers (pad); G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object); } @@ -293,12 +350,13 @@ gst_selector_pad_get_running_time (GstSelectorPad * pad) } GST_OBJECT_UNLOCK (pad); - GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT, - GST_TIME_ARGS (ret)); + GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT + " segment: %" GST_SEGMENT_FORMAT, GST_TIME_ARGS (ret), &pad->segment); return ret; } +/* must be called with the SELECTOR_LOCK */ static void gst_selector_pad_reset (GstSelectorPad * pad) { @@ -312,9 +370,55 @@ gst_selector_pad_reset (GstSelectorPad * pad) pad->flushing = FALSE; pad->position = GST_CLOCK_TIME_NONE; gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED); + pad->sending_cached_buffers = FALSE; + gst_selector_pad_free_cached_buffers (pad); GST_OBJECT_UNLOCK (pad); } +static GstSelectorPadCachedBuffer * +gst_selector_pad_new_cached_buffer (GstSelectorPad * selpad, GstBuffer * buffer) +{ + GstSelectorPadCachedBuffer *cached_buffer = + g_slice_new (GstSelectorPadCachedBuffer); + cached_buffer->buffer = buffer; + cached_buffer->segment = selpad->segment; + return cached_buffer; +} + +static void +gst_selector_pad_free_cached_buffer (GstSelectorPadCachedBuffer * cached_buffer) +{ + gst_buffer_unref (cached_buffer->buffer); + g_slice_free (GstSelectorPadCachedBuffer, cached_buffer); +} + +/* must be called with the SELECTOR_LOCK */ +static void +gst_selector_pad_cache_buffer (GstSelectorPad * selpad, GstBuffer * buffer) +{ + GST_DEBUG_OBJECT (selpad, "Caching buffer %p", buffer); + if (!selpad->cached_buffers) + selpad->cached_buffers = g_queue_new (); + g_queue_push_tail (selpad->cached_buffers, + gst_selector_pad_new_cached_buffer (selpad, buffer)); +} + +/* must be called with the SELECTOR_LOCK */ +static void +gst_selector_pad_free_cached_buffers (GstSelectorPad * selpad) +{ + GstSelectorPadCachedBuffer *cached_buffer; + + if (!selpad->cached_buffers) + return; + + GST_DEBUG_OBJECT (selpad, "Freeing cached buffers"); + while ((cached_buffer = g_queue_pop_head (selpad->cached_buffers))) + gst_selector_pad_free_cached_buffer (cached_buffer); + g_queue_free (selpad->cached_buffers); + selpad->cached_buffers = NULL; +} + /* strictly get the linked pad from the sinkpad. If the pad is active we return * the srcpad else we return NULL */ static GstIterator * @@ -351,36 +455,35 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) sel = GST_INPUT_SELECTOR (parent); selpad = GST_SELECTOR_PAD_CAST (pad); + GST_DEBUG_OBJECT (selpad, "received event %" GST_PTR_FORMAT, event); GST_INPUT_SELECTOR_LOCK (sel); prev_active_sinkpad = sel->active_sinkpad; active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); - - /* only forward if we are dealing with the active sinkpad */ - forward = (pad == active_sinkpad); GST_INPUT_SELECTOR_UNLOCK (sel); if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) { g_object_notify (G_OBJECT (sel), "active-pad"); } + GST_INPUT_SELECTOR_LOCK (sel); + active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); + + /* only forward if we are dealing with the active sinkpad */ + forward = (pad == active_sinkpad); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: /* Unblock the pad if it's waiting */ - GST_INPUT_SELECTOR_LOCK (sel); selpad->flushing = TRUE; GST_INPUT_SELECTOR_BROADCAST (sel); - GST_INPUT_SELECTOR_UNLOCK (sel); break; case GST_EVENT_FLUSH_STOP: - GST_INPUT_SELECTOR_LOCK (sel); gst_selector_pad_reset (selpad); - GST_INPUT_SELECTOR_UNLOCK (sel); + GST_INPUT_SELECTOR_BROADCAST (sel); break; case GST_EVENT_SEGMENT: { - GST_INPUT_SELECTOR_LOCK (sel); - GST_OBJECT_LOCK (selpad); gst_event_copy_segment (event, &selpad->segment); selpad->segment_seqnum = gst_event_get_seqnum (event); @@ -400,9 +503,6 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) } GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT, &selpad->segment); - - GST_OBJECT_UNLOCK (selpad); - GST_INPUT_SELECTOR_UNLOCK (sel); break; } case GST_EVENT_TAG: @@ -411,7 +511,6 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_event_parse_tag (event, &tags); - GST_OBJECT_LOCK (selpad); oldtags = selpad->tags; newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE); @@ -419,7 +518,6 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) if (oldtags) gst_tag_list_unref (oldtags); GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags); - GST_OBJECT_UNLOCK (selpad); g_object_notify (G_OBJECT (selpad), "tags"); break; @@ -430,7 +528,7 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) if (forward) { selpad->eos_sent = TRUE; } else { - GstSelectorPad *tmp; + GstSelectorPad *active_selpad; /* If the active sinkpad is in EOS state but EOS * was not sent downstream this means that the pad @@ -438,18 +536,16 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) * the previously active pad got EOS after it was * active */ - GST_INPUT_SELECTOR_LOCK (sel); - active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); - tmp = GST_SELECTOR_PAD (active_sinkpad); - forward = (tmp->eos && !tmp->eos_sent); - tmp->eos_sent = TRUE; - GST_INPUT_SELECTOR_UNLOCK (sel); + active_selpad = GST_SELECTOR_PAD (active_sinkpad); + forward = (active_selpad->eos && !active_selpad->eos_sent); + active_selpad->eos_sent = TRUE; } GST_DEBUG_OBJECT (pad, "received EOS"); break; default: break; } + GST_INPUT_SELECTOR_UNLOCK (sel); if (forward) { GST_DEBUG_OBJECT (pad, "forwarding event"); res = gst_pad_push_event (sel->srcpad, event); @@ -495,103 +591,107 @@ gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad) return self->flushing; } -/* must be called with the SELECTOR_LOCK, will block until the running time +/* must be called without the SELECTOR_LOCK, will wait until the running time * of the active pad is after this pad or return TRUE when flushing */ static gboolean gst_input_selector_wait_running_time (GstInputSelector * sel, - GstSelectorPad * pad, GstBuffer * buf) + GstSelectorPad * selpad, GstBuffer * buf) { - GstPad *active_sinkpad; - GstSelectorPad *active_selpad; - GstSegment *seg, *active_seg; - GstClockTime running_time, active_running_time = GST_CLOCK_TIME_NONE; + GstSegment *seg; - seg = &pad->segment; - - active_sinkpad = - gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad)); - active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad); - active_seg = &active_selpad->segment; - - /* We can only sync if the segments are in time format or - * if the active pad had no newsegment event yet */ - if (seg->format != GST_FORMAT_TIME || - (active_seg->format != GST_FORMAT_TIME - && active_seg->format != GST_FORMAT_UNDEFINED)) - return FALSE; + GST_DEBUG_OBJECT (selpad, "entering wait for buffer %p", buf); /* If we have no valid timestamp we can't sync this buffer */ - if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf)) + if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { + GST_DEBUG_OBJECT (selpad, "leaving wait for buffer with " + "invalid timestamp"); return FALSE; + } - running_time = GST_BUFFER_TIMESTAMP (buf); - /* If possible try to get the running time at the end of the buffer */ - if (GST_BUFFER_DURATION_IS_VALID (buf)) - running_time += GST_BUFFER_DURATION (buf); - if (running_time > seg->stop) - running_time = seg->stop; - running_time = - gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time); - /* If this is outside the segment don't sync */ - if (running_time == -1) - return FALSE; - - /* Get active pad's running time, if no configured segment yet keep at -1 */ - if (active_seg->format == GST_FORMAT_TIME) - active_running_time = - gst_segment_to_running_time (active_seg, GST_FORMAT_TIME, - active_selpad->position); + seg = &selpad->segment; /* Wait until * a) this is the active pad * b) the pad or the selector is flushing * c) the selector is not blocked - * d) the active pad has no running time or the active - * pad's running time is before this running time - * e) the active pad has a non-time segment - * f) the active pad changed and has not pushed anything + * d) the buffer running time is before the current running time + * (either active-seg or clock, depending on sync-mode) */ - while (pad != active_selpad && !sel->flushing && !pad->flushing - && active_selpad->pushed && (sel->blocked || active_running_time == -1 - || running_time >= active_running_time)) { - if (!sel->blocked) - GST_DEBUG_OBJECT (pad, - "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %" - GST_TIME_FORMAT, GST_TIME_ARGS (running_time), - GST_TIME_ARGS (active_running_time)); - GST_INPUT_SELECTOR_WAIT (sel); + GST_INPUT_SELECTOR_LOCK (sel); + while (TRUE) { + GstPad *active_sinkpad; + GstSelectorPad *active_selpad; + GstClock *clock; + gint64 cur_running_time; + GstClockTime running_time; - /* Get new active pad, it might have changed */ active_sinkpad = - gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad)); + gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (selpad)); active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad); - active_seg = &active_selpad->segment; - /* If the active segment is configured but not to time format - * we can't do any syncing at all */ - if (active_seg->format != GST_FORMAT_TIME - && active_seg->format != GST_FORMAT_UNDEFINED) + running_time = GST_BUFFER_TIMESTAMP (buf); + /* If possible try to get the running time at the end of the buffer */ + if (GST_BUFFER_DURATION_IS_VALID (buf)) + running_time += GST_BUFFER_DURATION (buf); + /* Only use the segment to convert to running time if the segment is + * in TIME format, otherwise do our best to try to sync */ + if (seg->format == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (seg->stop)) { + if (running_time > seg->stop) { + running_time = seg->stop; + } + running_time = + gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time); + /* If this is outside the segment don't sync */ + if (running_time == -1) { + GST_INPUT_SELECTOR_UNLOCK (sel); + return FALSE; + } + } + + cur_running_time = GST_CLOCK_TIME_NONE; + if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) { + clock = gst_element_get_clock (GST_ELEMENT_CAST (sel)); + if (clock) { + GstClockTime base_time; + + cur_running_time = gst_clock_get_time (clock); + base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel)); + if (base_time <= cur_running_time) + cur_running_time -= base_time; + else + cur_running_time = 0; + } + } else { + GstSegment *active_seg; + + active_seg = &active_selpad->segment; + + /* Get active pad's running time, if no configured segment yet keep at -1 */ + if (active_seg->format == GST_FORMAT_TIME) + cur_running_time = gst_segment_to_running_time (active_seg, + GST_FORMAT_TIME, active_seg->position); + } + + if (selpad != active_selpad && !sel->flushing && !selpad->flushing && + (sel->cache_buffers || active_selpad->pushed) && + (sel->blocked || cur_running_time == -1 + || running_time >= cur_running_time)) { + if (!sel->blocked) + GST_DEBUG_OBJECT (selpad, + "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %" + GST_TIME_FORMAT, GST_TIME_ARGS (running_time), + GST_TIME_ARGS (cur_running_time)); + + GST_INPUT_SELECTOR_WAIT (sel); + } else { + GST_INPUT_SELECTOR_UNLOCK (sel); break; - - /* Get the new active pad running time */ - if (active_seg->format == GST_FORMAT_TIME) - active_running_time = - gst_segment_to_running_time (active_seg, GST_FORMAT_TIME, - active_selpad->position); - else - active_running_time = -1; - - if (!sel->blocked) - GST_DEBUG_OBJECT (pad, - "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %" - GST_TIME_FORMAT, GST_TIME_ARGS (running_time), - GST_TIME_ARGS (active_running_time)); - + } } /* Return TRUE if the selector or the pad is flushing */ - return (sel->flushing || pad->flushing); + return (sel->flushing || selpad->flushing); } static gboolean @@ -614,6 +714,165 @@ forward_sticky_events (GstPad * sinkpad, GstEvent ** event, gpointer user_data) return TRUE; } +#if DEBUG_CACHED_BUFFERS +static void +gst_input_selector_debug_cached_buffers (GstInputSelector * sel) +{ + GList *walk; + + for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) { + GstSelectorPad *selpad; + GString *timestamps; + gchar *str; + int i; + + selpad = GST_SELECTOR_PAD_CAST (walk->data); + if (!selpad->cached_buffers) { + GST_DEBUG_OBJECT (selpad, "Cached buffers timestamps: "); + continue; + } + + timestamps = g_string_new ("Cached buffers timestamps:"); + for (i = 0; i < selpad->cached_buffers->length; ++i) { + GstSelectorPadCachedBuffer *cached_buffer; + + cached_buffer = g_queue_peek_nth (selpad->cached_buffers, i); + g_string_append_printf (timestamps, " %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (cached_buffer->buffer))); + } + str = g_string_free (timestamps, FALSE); + GST_DEBUG_OBJECT (selpad, str); + g_free (str); + } +} +#endif + +/* must be called with the SELECTOR_LOCK */ +static void +gst_input_selector_cleanup_old_cached_buffers (GstInputSelector * sel, + GstPad * pad) +{ + GstSelectorPad *selpad; + GstSegment *seg; + GstClock *clock; + gint64 cur_running_time; + GList *walk; + + selpad = GST_SELECTOR_PAD_CAST (pad); + seg = &selpad->segment; + + cur_running_time = GST_CLOCK_TIME_NONE; + if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) { + clock = gst_element_get_clock (GST_ELEMENT_CAST (sel)); + if (clock) { + GstClockTime base_time; + + cur_running_time = gst_clock_get_time (clock); + base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel)); + if (base_time <= cur_running_time) + cur_running_time -= base_time; + else + cur_running_time = 0; + } + } else { + GstPad *active_sinkpad; + GstSelectorPad *active_selpad; + GstSegment *active_seg; + + active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); + active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad); + active_seg = &active_selpad->segment; + + /* Get active pad's running time, if no configured segment yet keep at -1 */ + if (active_seg->format == GST_FORMAT_TIME) + cur_running_time = gst_segment_to_running_time (active_seg, + GST_FORMAT_TIME, active_seg->position); + } + + if (!GST_CLOCK_TIME_IS_VALID (cur_running_time)) + return; + + GST_DEBUG_OBJECT (sel, "Cleaning up old cached buffers"); + for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) { + GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data); + GstSelectorPadCachedBuffer *cached_buffer; + GSList *maybe_remove; + guint queue_position; + + if (!selpad->cached_buffers) + continue; + + maybe_remove = NULL; + queue_position = 0; + while ((cached_buffer = g_queue_peek_nth (selpad->cached_buffers, + queue_position))) { + GstBuffer *buffer = cached_buffer->buffer; + GstClockTime running_time; + GSList *l; + + /* If we have no valid timestamp we can't sync this buffer */ + if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) { + maybe_remove = g_slist_append (maybe_remove, cached_buffer); + queue_position = g_slist_length (maybe_remove); + continue; + } + + /* the buffer is still valid if its duration is valid and the + * timestamp + duration is >= time, or if its duration is invalid + * and the timestamp is >= time */ + running_time = GST_BUFFER_TIMESTAMP (buffer); + /* If possible try to get the running time at the end of the buffer */ + if (GST_BUFFER_DURATION_IS_VALID (buffer)) + running_time += GST_BUFFER_DURATION (buffer); + /* Only use the segment to convert to running time if the segment is + * in TIME format, otherwise do our best to try to sync */ + if (seg->format == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (seg->stop)) { + if (running_time > seg->stop) { + running_time = seg->stop; + } + running_time = + gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time); + } + + GST_DEBUG_OBJECT (selpad, + "checking if buffer %p running time=%" GST_TIME_FORMAT + " >= stream time=%" GST_TIME_FORMAT, buffer, + GST_TIME_ARGS (running_time), GST_TIME_ARGS (cur_running_time)); + if (running_time >= cur_running_time) { + break; + } + + GST_DEBUG_OBJECT (selpad, "Removing old cached buffer %p", buffer); + g_queue_pop_nth (selpad->cached_buffers, queue_position); + gst_selector_pad_free_cached_buffer (cached_buffer); + + for (l = maybe_remove; l != NULL; l = g_slist_next (l)) { + /* A buffer after some invalid buffers was removed, it means the invalid buffers + * are old, lets also remove them */ + cached_buffer = l->data; + g_queue_remove (selpad->cached_buffers, cached_buffer); + gst_selector_pad_free_cached_buffer (cached_buffer); + } + + g_slist_free (maybe_remove); + maybe_remove = NULL; + queue_position = 0; + } + + g_slist_free (maybe_remove); + maybe_remove = NULL; + + if (g_queue_is_empty (selpad->cached_buffers)) { + g_queue_free (selpad->cached_buffers); + selpad->cached_buffers = NULL; + } + } + +#if DEBUG_CACHED_BUFFERS + gst_input_selector_debug_cached_buffers (sel); +#endif +} + static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) { @@ -627,10 +886,16 @@ gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) sel = GST_INPUT_SELECTOR (parent); selpad = GST_SELECTOR_PAD_CAST (pad); + GST_DEBUG_OBJECT (selpad, + "entering chain for buf %p with timestamp %" GST_TIME_FORMAT, buf, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); + GST_INPUT_SELECTOR_LOCK (sel); /* wait or check for flushing */ - if (gst_input_selector_wait (sel, selpad)) + if (gst_input_selector_wait (sel, selpad)) { + GST_INPUT_SELECTOR_UNLOCK (sel); goto flushing; + } GST_LOG_OBJECT (pad, "getting active pad"); @@ -639,13 +904,57 @@ gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) /* In sync mode wait until the active pad has advanced * after the running time of the current buffer */ - if (sel->sync_streams && active_sinkpad != pad) { - if (gst_input_selector_wait_running_time (sel, selpad, buf)) - goto flushing; - } + if (sel->sync_streams) { + /* call chain for each cached buffer if we are not the active pad + * or if we are the active pad but didn't push anything yet. */ + if (active_sinkpad != pad || !selpad->pushed) { + /* no need to check for sel->cache_buffers as selpad->cached_buffers + * will only be valid if cache_buffers is TRUE */ + if (selpad->cached_buffers && !selpad->sending_cached_buffers) { + GstSelectorPadCachedBuffer *cached_buffer; + GstSegment saved_segment; - /* Might have changed while waiting */ - active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); + saved_segment = selpad->segment; + + selpad->sending_cached_buffers = TRUE; + while (!sel->flushing && !selpad->flushing && + (cached_buffer = g_queue_pop_head (selpad->cached_buffers))) { + GST_DEBUG_OBJECT (pad, "Cached buffers found, " + "invoking chain for cached buffer %p", cached_buffer->buffer); + + selpad->segment = cached_buffer->segment; + selpad->events_pending = TRUE; + GST_INPUT_SELECTOR_UNLOCK (sel); + gst_selector_pad_chain (pad, parent, cached_buffer->buffer); + GST_INPUT_SELECTOR_LOCK (sel); + + /* we may have cleaned up the queue in the meantime because of + * old buffers */ + if (!selpad->cached_buffers) { + break; + } + } + selpad->sending_cached_buffers = FALSE; + + /* all cached buffers sent, restore segment for current buffer */ + selpad->segment = saved_segment; + selpad->events_pending = TRUE; + + /* Might have changed while calling chain for cached buffers */ + active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); + } + } + + if (active_sinkpad != pad) { + GST_INPUT_SELECTOR_UNLOCK (sel); + if (gst_input_selector_wait_running_time (sel, selpad, buf)) + goto flushing; + GST_INPUT_SELECTOR_LOCK (sel); + } + + /* Might have changed while waiting */ + active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); + } /* update the segment on the srcpad */ start_time = GST_BUFFER_TIMESTAMP (buf); @@ -693,10 +1002,29 @@ gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) } /* forward */ - GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf); + GST_LOG_OBJECT (pad, "Forwarding buffer %p with timestamp %" GST_TIME_FORMAT, + buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); - res = gst_pad_push (sel->srcpad, buf); - selpad->pushed = TRUE; + res = gst_pad_push (sel->srcpad, gst_buffer_ref (buf)); + GST_LOG_OBJECT (pad, "Buffer %p forwarded result=%d", buf, res); + + GST_INPUT_SELECTOR_LOCK (sel); + + if (sel->sync_streams && sel->cache_buffers) { + /* Might have changed while pushing */ + active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad); + /* only set pad to pushed if we are still the active pad */ + if (active_sinkpad == pad) + selpad->pushed = TRUE; + + /* cache buffer as we may need it again if we change pads */ + gst_selector_pad_cache_buffer (selpad, buf); + gst_input_selector_cleanup_old_cached_buffers (sel, pad); + } else { + selpad->pushed = TRUE; + gst_buffer_unref (buf); + } + GST_INPUT_SELECTOR_UNLOCK (sel); done: return res; @@ -706,7 +1034,8 @@ ignore: { gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed; - GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf); + GST_DEBUG_OBJECT (pad, "Pad not active or buffer timestamp is invalid, " + "discard buffer %p", buf); /* when we drop a buffer, we're creating a discont on this pad */ selpad->discont = TRUE; GST_INPUT_SELECTOR_UNLOCK (sel); @@ -725,7 +1054,6 @@ ignore: flushing: { GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf); - GST_INPUT_SELECTOR_UNLOCK (sel); gst_buffer_unref (buf); res = GST_FLOW_FLUSHING; goto done; @@ -818,16 +1146,59 @@ gst_input_selector_class_init (GstInputSelectorClass * klass) * GstInputSelector:sync-streams * * If set to %TRUE all inactive streams will be synced to the - * running time of the active stream. This makes sure that no - * buffers are dropped by input-selector that might be needed - * when switching the active pad. + * running time of the active stream or to the current clock. + * + * To make sure no buffers are dropped by input-selector + * that might be needed when switching the active pad, + * sync-mode should be set to "clock" and cache-buffers to TRUE. * * Since: 0.10.36 */ g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS, g_param_spec_boolean ("sync-streams", "Sync Streams", - "Synchronize inactive streams to the running time of the active stream", - DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + "Synchronize inactive streams to the running time of the active " + "stream or to the current clock", + DEFAULT_SYNC_STREAMS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + /** + * GstInputSelector:sync-mode + * + * Select how input-selector will sync buffers when in sync-streams mode. + * + * Note that when using the "active-segment" mode, the "active-segment" may + * be ahead of current clock time when switching the active pad, as the current + * active pad may have pushed more buffers than what was displayed/consumed, + * which may cause delays and some missing buffers. + * + * Since: 0.10.36 + */ + g_object_class_install_property (gobject_class, PROP_SYNC_MODE, + g_param_spec_enum ("sync-mode", "Sync mode", + "Behavior in sync-streams mode", GST_TYPE_INPUT_SELECTOR_SYNC_MODE, + DEFAULT_SYNC_MODE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + /** + * GstInputSelector:cache-buffers + * + * If set to %TRUE and GstInputSelector:sync-streams is also set to %TRUE, + * the active pad will cache the buffers still considered valid (after current + * running time, see sync-mode) to avoid missing frames if/when the pad is + * reactivated. + * + * The active pad may push more buffers than what is currently displayed/consumed + * and when changing pads those buffers will be discarded and the only way to + * reactivate that pad without loosing the already consumed buffers is to enable cache. + */ + g_object_class_install_property (gobject_class, PROP_CACHE_BUFFERS, + g_param_spec_boolean ("cache-buffers", "Cache Buffers", + "Cache buffers for active-pad", + DEFAULT_CACHE_BUFFERS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); /** * GstInputSelector::block: @@ -880,6 +1251,12 @@ gst_input_selector_init (GstInputSelector * sel) g_mutex_init (&sel->lock); g_cond_init (&sel->cond); sel->blocked = FALSE; + + /* lets give a change for downstream to do something on + * active-pad change before we start pushing new buffers */ + g_signal_connect_data (sel, "notify::active-pad", + (GCallback) gst_input_selector_active_pad_changed, NULL, + NULL, G_CONNECT_AFTER); } static void @@ -934,13 +1311,6 @@ gst_input_selector_set_active_pad (GstInputSelector * self, GstPad * pad) active_pad_p = &self->active_sinkpad; gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad)); - gst_pad_push_event (pad, gst_event_new_reconfigure ()); - - /* Wake up all non-active pads in sync mode, they might be - * the active pad now */ - if (self->sync_streams) - GST_INPUT_SELECTOR_BROADCAST (self); - GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT, self->active_sinkpad); @@ -961,23 +1331,51 @@ gst_input_selector_set_property (GObject * object, guint prop_id, pad = g_value_get_object (value); GST_INPUT_SELECTOR_LOCK (sel); + +#if DEBUG_CACHED_BUFFERS + gst_input_selector_debug_cached_buffers (sel); +#endif + gst_input_selector_set_active_pad (sel, pad); + +#if DEBUG_CACHED_BUFFERS + gst_input_selector_debug_cached_buffers (sel); +#endif + GST_INPUT_SELECTOR_UNLOCK (sel); break; } case PROP_SYNC_STREAMS: - { GST_INPUT_SELECTOR_LOCK (sel); sel->sync_streams = g_value_get_boolean (value); GST_INPUT_SELECTOR_UNLOCK (sel); break; - } + case PROP_SYNC_MODE: + GST_INPUT_SELECTOR_LOCK (sel); + sel->sync_mode = g_value_get_enum (value); + GST_INPUT_SELECTOR_UNLOCK (sel); + break; + case PROP_CACHE_BUFFERS: + GST_INPUT_SELECTOR_LOCK (object); + sel->cache_buffers = g_value_get_boolean (value); + GST_INPUT_SELECTOR_UNLOCK (object); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static void +gst_input_selector_active_pad_changed (GstInputSelector * sel, + GParamSpec * pspec, gpointer user_data) +{ + /* Wake up all non-active pads in sync mode, they might be + * the active pad now */ + if (sel->sync_streams) + GST_INPUT_SELECTOR_BROADCAST (sel); +} + static void gst_input_selector_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) @@ -1000,6 +1398,16 @@ gst_input_selector_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, sel->sync_streams); GST_INPUT_SELECTOR_UNLOCK (object); break; + case PROP_SYNC_MODE: + GST_INPUT_SELECTOR_LOCK (object); + g_value_set_enum (value, sel->sync_mode); + GST_INPUT_SELECTOR_UNLOCK (object); + break; + case PROP_CACHE_BUFFERS: + GST_INPUT_SELECTOR_LOCK (object); + g_value_set_boolean (value, sel->cache_buffers); + GST_INPUT_SELECTOR_UNLOCK (object); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/plugins/elements/gstinputselector.h b/plugins/elements/gstinputselector.h index 8a5248a0d4..dd48a51087 100644 --- a/plugins/elements/gstinputselector.h +++ b/plugins/elements/gstinputselector.h @@ -48,6 +48,18 @@ typedef struct _GstInputSelectorClass GstInputSelectorClass; GST_INPUT_SELECTOR_GET_LOCK(sel))) #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel))) +/** + * GstInputSelectorSyncMode: + * @GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT: Sync using the current active segment. + * @GST_INPUT_SELECTOR_SYNC_MODE_CLOCK: Sync using the clock. + * + * The different ways that input-selector can behave when in sync-streams mode. + */ +typedef enum { + GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT, + GST_INPUT_SELECTOR_SYNC_MODE_CLOCK +} GstInputSelectorSyncMode; + struct _GstInputSelector { GstElement element; @@ -57,6 +69,8 @@ struct _GstInputSelector { guint n_pads; guint padcount; gboolean sync_streams; + GstInputSelectorSyncMode sync_mode; + gboolean cache_buffers; GMutex lock; GCond cond;