diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.c b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.c index 79a4edcabc..bd6d5078dc 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.c +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.c @@ -152,7 +152,7 @@ handle_buffer_measuring (GstSplitMuxPartReader * reader, part_pad->seen_buffer = TRUE; /* Adjust buffer timestamps */ - offset = reader->start_offset + part_pad->segment.base; + offset = reader->info.start_offset + part_pad->segment.base; offset -= part_pad->initial_ts_offset; /* We don't add the ts_offset here, because we * want to measure the logical length of the stream, @@ -245,7 +245,7 @@ splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) } /* Adjust buffer timestamps */ - offset = reader->start_offset + part_pad->segment.base; + offset = reader->info.start_offset + part_pad->segment.base; offset -= part_pad->initial_ts_offset; offset += reader->ts_offset; @@ -418,11 +418,11 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) * adding a fixed offset so that DTS is never negative */ if (seg->stop != -1) { seg->stop -= seg->start; - seg->stop += seg->time + reader->start_offset + reader->ts_offset; + seg->stop += seg->time + reader->info.start_offset + reader->ts_offset; } - seg->start = seg->time + reader->start_offset + reader->ts_offset; - seg->time += reader->start_offset; - seg->position += reader->start_offset; + seg->start = seg->time + reader->info.start_offset + reader->ts_offset; + seg->time += reader->info.start_offset; + seg->position += reader->info.start_offset; /* Replace event */ gst_event_unref (event); @@ -448,7 +448,7 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) GST_DEBUG_OBJECT (reader, "Adjusting segment stop by %" GST_TIME_FORMAT " output now %" GST_SEGMENT_FORMAT, - GST_TIME_ARGS (reader->start_offset), &target->segment); + GST_TIME_ARGS (reader->info.start_offset), &target->segment); } } GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event); @@ -684,8 +684,11 @@ gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader) { GstElement *typefind; + reader->prep_state = PART_STATE_NULL; + reader->need_duration_measuring = TRUE; + reader->active = FALSE; - reader->duration = GST_CLOCK_TIME_NONE; + reader->info.duration = GST_CLOCK_TIME_NONE; g_cond_init (&reader->inactive_cond); g_mutex_init (&reader->lock); @@ -814,7 +817,6 @@ new_decoded_pad_added_cb (GstElement * element, GstPad * pad, GstSplitMuxPartReader * reader) { GstPad *out_pad = NULL; - GstSplitMuxPartPad *proxy_pad; GstCaps *caps; GstPadLinkReturn link_ret; @@ -835,7 +837,8 @@ new_decoded_pad_added_cb (GstElement * element, GstPad * pad, } /* Create our proxy pad to interact with this new pad */ - proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad); + GstSplitMuxPartPad *proxy_pad = + gst_splitmux_part_reader_new_proxy_pad (reader, out_pad); GST_DEBUG_OBJECT (reader, "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT, proxy_pad, out_pad); @@ -914,12 +917,12 @@ gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader, flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags; SPLITMUX_PART_LOCK (reader); - if (target_seg->start >= reader->start_offset) - start = target_seg->start - reader->start_offset; + if (target_seg->start >= reader->info.start_offset) + start = target_seg->start - reader->info.start_offset; /* If the segment stop is within this part, don't play to the end */ if (target_seg->stop != -1 && - target_seg->stop < reader->start_offset + reader->duration) - stop = target_seg->stop - reader->start_offset; + target_seg->stop < reader->info.start_offset + reader->info.duration) + stop = target_seg->stop - reader->info.start_offset; SPLITMUX_PART_UNLOCK (reader); @@ -942,9 +945,9 @@ gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader) * to EOS in order to find the smallest end timestamp to start the next * file from */ - if (GST_CLOCK_TIME_IS_VALID (reader->duration) - && reader->duration > GST_SECOND) { - GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND); + if (GST_CLOCK_TIME_IS_VALID (reader->info.duration) + && reader->info.duration > GST_SECOND) { + GstClockTime seek_ts = reader->info.duration - (0.5 * GST_SECOND); gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts); } SPLITMUX_PART_UNLOCK (reader); @@ -956,17 +959,31 @@ gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader * { SPLITMUX_PART_LOCK (reader); if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) { + GstClockTime end_offset = GST_CLOCK_TIME_NONE; + /* Fire the prepared signal and go to READY state */ - GST_DEBUG_OBJECT (reader, - "Stream measuring complete. File %s is now ready", reader->path); reader->prep_state = PART_STATE_READY; - for (GList * cur = g_list_first (reader->pads); cur != NULL; - cur = g_list_next (cur)) { - GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); - GST_WARNING_OBJECT (part_pad, - "Finished measuring. MinTS seen %" GST_TIMEP_FORMAT " MaxTS seen %" - GST_TIMEP_FORMAT, &part_pad->min_ts, &part_pad->max_ts); + if (reader->need_duration_measuring) { + for (GList * cur = g_list_first (reader->pads); cur != NULL; + cur = g_list_next (cur)) { + GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); + GST_WARNING_OBJECT (part_pad, + "Finished measuring. MinTS seen %" GST_TIMEP_FORMAT " MaxTS seen %" + GST_TIMEP_FORMAT, &part_pad->min_ts, &part_pad->max_ts); + + if (!part_pad->is_sparse && part_pad->max_ts < end_offset) { + end_offset = part_pad->max_ts; + } + } + GST_DEBUG_OBJECT (reader, + "Stream measuring complete. File %s is now ready. End offset %" + GST_TIMEP_FORMAT, reader->path, &end_offset); + + reader->end_offset = end_offset; + reader->need_duration_measuring = FALSE; // We won't re-measure this part } + + SPLITMUX_PART_BROADCAST (reader); SPLITMUX_PART_UNLOCK (reader); do_async_done (reader); } else { @@ -1036,12 +1053,21 @@ check_if_pads_collected (GstSplitMuxPartReader * reader) if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) { /* Check we have all pads and each pad has seen a buffer */ if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) { - GST_DEBUG_OBJECT (reader, - "no more pads - file %s. Measuring stream length", reader->path); - reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS; - gst_element_call_async (GST_ELEMENT_CAST (reader), - (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams, - NULL, NULL); + if (reader->need_duration_measuring) { + /* Need to measure duration before finishing */ + GST_DEBUG_OBJECT (reader, + "no more pads - file %s. Measuring stream length", reader->path); + reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS; + gst_element_call_async (GST_ELEMENT_CAST (reader), + (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams, + NULL, NULL); + } else { + reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY; + + gst_element_call_async (GST_ELEMENT_CAST (reader), + (GstElementCallAsyncFunc) + gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL); + } } } } @@ -1069,7 +1095,7 @@ no_more_pads (GstElement * element, GstSplitMuxPartReader * reader) } GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT, reader->path, GST_TIME_ARGS (duration)); - reader->duration = (GstClockTime) duration; + reader->info.duration = (GstClockTime) duration; reader->no_more_pads = TRUE; @@ -1114,7 +1140,7 @@ gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part, if (fmt != GST_FORMAT_TIME) return FALSE; SPLITMUX_PART_LOCK (part); - position += part->start_offset; + position += part->info.start_offset; GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT, GST_TIME_ARGS (position)); SPLITMUX_PART_UNLOCK (part); @@ -1186,6 +1212,7 @@ gst_splitmux_part_reader_change_state (GstElement * element, break; case GST_STATE_CHANGE_PAUSED_TO_READY: do_async_done (reader); + splitmux_part_reader_reset (reader); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: SPLITMUX_PART_LOCK (reader); @@ -1196,7 +1223,6 @@ gst_splitmux_part_reader_change_state (GstElement * element, break; case GST_STATE_CHANGE_READY_TO_NULL: reader->prep_state = PART_STATE_NULL; - splitmux_part_reader_reset (reader); break; default: break; @@ -1219,12 +1245,52 @@ gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part) return TRUE; } +static gboolean +gst_splitmux_part_reader_prepare_sync (GstSplitMuxPartReader * reader) +{ + GstStateChangeReturn ret; + + ret = gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED); + if (ret == GST_STATE_CHANGE_FAILURE) + return FALSE; + + if (ret == GST_STATE_CHANGE_ASYNC) { + SPLITMUX_PART_LOCK (reader); + while (reader->running && reader->prep_state != PART_STATE_READY) { + if (reader->prep_state == PART_STATE_FAILED) { + SPLITMUX_PART_UNLOCK (reader); + return FALSE; + } + + GST_LOG_OBJECT (reader, + "Waiting for prepare (or failure) on reader %s", reader->path); + SPLITMUX_PART_WAIT (reader); + } + + SPLITMUX_PART_UNLOCK (reader); + } + + return TRUE; +} + void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part) { gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL); } +gboolean +gst_splitmux_part_reader_is_running (GstSplitMuxPartReader * part) +{ + gboolean ret; + + SPLITMUX_PART_LOCK (part); + ret = part->running; + SPLITMUX_PART_UNLOCK (part); + + return ret; +} + void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader, const gchar * path) @@ -1238,6 +1304,11 @@ gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader, { GST_DEBUG_OBJECT (reader, "Activating part reader"); + if (!gst_splitmux_part_reader_prepare_sync (reader)) { + GST_ERROR_OBJECT (reader, "Failed to prepare part before activation"); + return FALSE; + } + if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) { GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT, seg); @@ -1270,6 +1341,13 @@ gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader) gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED); } +void +gst_splitmux_part_reader_stop (GstSplitMuxPartReader * reader) +{ + GST_DEBUG_OBJECT (reader, "Stopping reader tasks"); + gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_READY); +} + void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader, gboolean flushing) @@ -1297,16 +1375,10 @@ gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader, GstClockTime gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader) { - GList *cur; GstClockTime ret = GST_CLOCK_TIME_NONE; SPLITMUX_PART_LOCK (reader); - for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) { - GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data); - if (!part_pad->is_sparse && part_pad->max_ts < ret) - ret = part_pad->max_ts; - } - + ret = reader->end_offset; SPLITMUX_PART_UNLOCK (reader); return ret; @@ -1317,7 +1389,7 @@ gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader, GstClockTime time_offset, GstClockTime ts_offset) { SPLITMUX_PART_LOCK (reader); - reader->start_offset = time_offset; + reader->info.start_offset = time_offset; reader->ts_offset = ts_offset; GST_INFO_OBJECT (reader, "Time offset now %" GST_TIME_FORMAT, GST_TIME_ARGS (time_offset)); @@ -1330,7 +1402,7 @@ gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader) GstClockTime ret = GST_CLOCK_TIME_NONE; SPLITMUX_PART_LOCK (reader); - ret = reader->start_offset; + ret = reader->info.start_offset; SPLITMUX_PART_UNLOCK (reader); return ret; @@ -1342,7 +1414,7 @@ gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader) GstClockTime dur; SPLITMUX_PART_LOCK (reader); - dur = reader->duration; + dur = reader->info.duration; SPLITMUX_PART_UNLOCK (reader); return dur; @@ -1388,6 +1460,7 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad, /* Have to drop the lock around pop, so we can be woken up for flush */ SPLITMUX_PART_UNLOCK (reader); if (!gst_data_queue_pop (q, item) || (*item == NULL)) { + GST_LOG_OBJECT (part_pad, "Popped null item -> flushing"); ret = GST_FLOW_FLUSHING; goto out; } @@ -1398,8 +1471,10 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad, if (GST_IS_EVENT ((*item)->object)) { GstEvent *e = (GstEvent *) ((*item)->object); /* Mark this pad as EOS */ - if (GST_EVENT_TYPE (e) == GST_EVENT_EOS) + if (GST_EVENT_TYPE (e) == GST_EVENT_EOS) { + GST_LOG_OBJECT (part_pad, "popping EOS event"); part_pad->is_eos = TRUE; + } } SPLITMUX_PART_UNLOCK (reader); diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.h b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.h index 78ecc6eb5a..2b92d78af2 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.h +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.h @@ -37,6 +37,7 @@ G_BEGIN_DECLS typedef struct _GstSplitMuxPartReader GstSplitMuxPartReader; typedef struct _GstSplitMuxPartReaderClass GstSplitMuxPartReaderClass; +typedef struct _GstSplitMuxPartReaderInfo GstSplitMuxPartReaderInfo; typedef struct _SplitMuxSrcPad SplitMuxSrcPad; typedef struct _SplitMuxSrcPadClass SplitMuxSrcPadClass; @@ -52,11 +53,18 @@ typedef enum typedef GstPad *(*GstSplitMuxPartReaderPadCb)(GstSplitMuxPartReader *reader, GstPad *src_pad, gpointer cb_data); +struct _GstSplitMuxPartReaderInfo +{ + GstClockTime duration; + GstClockTime start_offset; +}; + struct _GstSplitMuxPartReader { GstPipeline parent; GstSplitMuxPartState prep_state; + gboolean need_duration_measuring; gchar *path; @@ -71,9 +79,10 @@ struct _GstSplitMuxPartReader gboolean flushing; gboolean no_more_pads; - GstClockTime duration; - GstClockTime start_offset; + GstSplitMuxPartReaderInfo info; + GstClockTime ts_offset; + GstClockTime end_offset; GList *pads; @@ -100,6 +109,7 @@ void gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader *reader, gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb); gboolean gst_splitmux_part_reader_prepare (GstSplitMuxPartReader *part); void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader *part); +gboolean gst_splitmux_part_reader_is_running (GstSplitMuxPartReader *part); void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader *reader, const gchar *path); gboolean gst_splitmux_part_is_eos (GstSplitMuxPartReader *reader); @@ -108,6 +118,8 @@ gboolean gst_splitmux_part_reader_activate (GstSplitMuxPartReader *part, GstSegm void gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader *part); gboolean gst_splitmux_part_reader_is_active (GstSplitMuxPartReader *part); +void gst_splitmux_part_reader_stop (GstSplitMuxPartReader *part); + gboolean gst_splitmux_part_reader_src_query (GstSplitMuxPartReader *part, GstPad *src_pad, GstQuery * query); void gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader *part, GstClockTime time_offset, GstClockTime ts_offset); GstClockTime gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader *part); diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c index 1b1359efc5..99679115b2 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.c @@ -60,9 +60,12 @@ GST_DEBUG_CATEGORY (splitmux_debug); enum { PROP_0, - PROP_LOCATION + PROP_LOCATION, + PROP_NUM_OPEN_FRAGMENTS }; +#define DEFAULT_OPEN_FRAGMENTS 0 + enum { SIGNAL_FORMAT_LOCATION, @@ -234,6 +237,13 @@ gst_splitmux_src_class_init (GstSplitMuxSrcClass * klass) g_param_spec_string ("location", "File Input Pattern", "Glob pattern for the location of the files to read", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_NUM_OPEN_FRAGMENTS, + g_param_spec_uint ("num-open-fragments", "Open files limit", + "Number of files to keep open simultaneously. " + "(0 = open all fragments at the start). " + "May still use slightly more if set to less than the number of streams in the files", + 0, G_MAXUINT, DEFAULT_OPEN_FRAGMENTS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstSplitMuxSrc::format-location: @@ -257,6 +267,7 @@ gst_splitmux_src_init (GstSplitMuxSrc * splitmux) g_rw_lock_init (&splitmux->pads_rwlock); splitmux->total_duration = GST_CLOCK_TIME_NONE; gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME); + splitmux->target_max_readers = DEFAULT_OPEN_FRAGMENTS; } static void @@ -305,6 +316,11 @@ gst_splitmux_src_set_property (GObject * object, guint prop_id, GST_OBJECT_UNLOCK (splitmux); break; } + case PROP_NUM_OPEN_FRAGMENTS: + GST_OBJECT_LOCK (splitmux); + splitmux->target_max_readers = g_value_get_uint (value); + GST_OBJECT_UNLOCK (splitmux); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -323,6 +339,11 @@ gst_splitmux_src_get_property (GObject * object, guint prop_id, g_value_set_string (value, splitmux->location); GST_OBJECT_UNLOCK (splitmux); break; + case PROP_NUM_OPEN_FRAGMENTS: + GST_OBJECT_LOCK (splitmux); + g_value_set_uint (value, splitmux->target_max_readers); + GST_OBJECT_UNLOCK (splitmux); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -433,9 +454,6 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg, gboolean need_no_more_pads; if (idx >= splitmux->num_parts) { - /* Shouldn't really happen! */ - do_async_done (splitmux); - g_warn_if_reached (); break; } @@ -544,7 +562,7 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg, } static GstSplitMuxPartReader * -gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename) +gst_splitmux_part_reader_create (GstSplitMuxSrc * splitmux, char *filename) { GstSplitMuxPartReader *r; GstBus *bus; @@ -830,20 +848,69 @@ flushing: return; } +static void +reduce_active_readers (GstSplitMuxSrc * splitmux) +{ + /* Try and reduce the active reader count by deactivating the + * oldest reader if it's no longer in use */ + if (splitmux->target_max_readers == 0) { + return; + } + while (g_queue_get_length (splitmux->active_parts) >= + splitmux->target_max_readers) { + GstSplitMuxPartReader *oldest_reader = + g_queue_peek_head (splitmux->active_parts); + if (gst_splitmux_part_reader_is_active (oldest_reader)) { + return; + } + + GST_DEBUG_OBJECT (splitmux, "Stopping least recently used part %s", + oldest_reader->path); + oldest_reader = g_queue_pop_head (splitmux->active_parts); + gst_splitmux_part_reader_stop (oldest_reader); + g_object_unref (oldest_reader); + } +} + +static void +add_to_active_readers (GstSplitMuxSrc * splitmux, + GstSplitMuxPartReader * reader) +{ + if (splitmux->target_max_readers != 0) { + /* Check if it's already in the active reader pool, and move this reader + * to the tail, or else add a ref and push it on the tail */ + if (gst_splitmux_part_reader_is_running (reader)) { + /* Already in the queue, and reffed, move it to the end without + * adding another ref */ + gboolean in_queue = g_queue_remove (splitmux->active_parts, reader); + g_assert (in_queue == TRUE); + } else { + /* Putting it in the queue. Add a ref */ + g_object_ref (reader); + /* When adding a new reader to the list, reduce active readers first */ + reduce_active_readers (splitmux); + } + g_queue_push_tail (splitmux->active_parts, reader); + } +} + static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part, GstSeekFlags extra_flags) { - GList *cur; - GST_DEBUG_OBJECT (splitmux, "Activating part %d", part); + GstSplitMuxPartReader *reader = splitmux->parts[part]; splitmux->cur_part = part; - if (!gst_splitmux_part_reader_activate (splitmux->parts[part], - &splitmux->play_segment, extra_flags)) + add_to_active_readers (splitmux, reader); + if (!gst_splitmux_part_reader_activate (reader, + &splitmux->play_segment, extra_flags)) { return FALSE; + } SPLITMUX_SRC_PADS_RLOCK (splitmux); + GList *cur; + for (cur = g_list_first (splitmux->pads); cur != NULL; cur = g_list_next (cur)) { SplitMuxSrcPad *splitpad = (SplitMuxSrcPad *) (cur->data); @@ -879,6 +946,8 @@ gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux) GST_DEBUG_OBJECT (splitmux, "Preparing file part %s (%u)", splitmux->parts[idx]->path, idx); + add_to_active_readers (splitmux, splitmux->parts[idx]); + gst_splitmux_part_reader_set_start_offset (splitmux->parts[idx], splitmux->end_offset, FIXED_TS_OFFSET); if (!gst_splitmux_part_reader_prepare (splitmux->parts[idx])) { @@ -939,13 +1008,15 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux) splitmux->running = TRUE; SPLITMUX_SRC_UNLOCK (splitmux); + splitmux->active_parts = g_queue_new (); + splitmux->num_parts = g_strv_length (files); splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts); /* Create all part pipelines */ for (i = 0; i < splitmux->num_parts; i++) { - splitmux->parts[i] = gst_splitmux_part_create (splitmux, files[i]); + splitmux->parts[i] = gst_splitmux_part_reader_create (splitmux, files[i]); if (splitmux->parts[i] == NULL) break; } @@ -1008,16 +1079,12 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux) splitmux->running = FALSE; GST_DEBUG_OBJECT (splitmux, "Stopping"); - SPLITMUX_SRC_UNLOCK (splitmux); - - /* Stop all part readers. We don't need the lock here, - * because all parts were created in _start() */ + /* Stop all part readers. */ for (i = 0; i < splitmux->num_created_parts; i++) { if (splitmux->parts[i] == NULL) continue; gst_splitmux_part_reader_unprepare (splitmux->parts[i]); } - SPLITMUX_SRC_LOCK (splitmux); SPLITMUX_SRC_PADS_WLOCK (splitmux); pads_list = splitmux->pads; @@ -1034,6 +1101,8 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux) SPLITMUX_SRC_LOCK (splitmux); /* Now the pad task is stopped we can destroy the readers */ + g_queue_free_full (splitmux->active_parts, g_object_unref); + for (i = 0; i < splitmux->num_created_parts; i++) { if (splitmux->parts[i] == NULL) continue; @@ -1047,6 +1116,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux) splitmux->num_prepared_parts = 0; splitmux->num_created_parts = 0; splitmux->total_duration = GST_CLOCK_TIME_NONE; + /* Reset playback segment */ gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME); out: @@ -1087,7 +1157,6 @@ gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, GstPad * pad, GstPad *target = NULL; gboolean is_new_pad = FALSE; - SPLITMUX_SRC_LOCK (splitmux); SPLITMUX_SRC_PADS_WLOCK (splitmux); for (cur = g_list_first (splitmux->pads); cur != NULL; cur = g_list_next (cur)) { @@ -1116,7 +1185,6 @@ gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, GstPad * pad, is_new_pad = TRUE; } SPLITMUX_SRC_PADS_WUNLOCK (splitmux); - SPLITMUX_SRC_UNLOCK (splitmux); g_free (pad_name); @@ -1212,8 +1280,11 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad) SPLITMUX_SRC_LOCK (splitmux); /* If all pads are done with this part, deactivate it */ - if (gst_splitmux_part_is_eos (splitmux->parts[splitpad->cur_part])) + if (gst_splitmux_part_is_eos (splitmux->parts[splitpad->cur_part])) { + GST_DEBUG_OBJECT (splitmux, "All pads in part %d finished. Deactivating it", + cur_part); gst_splitmux_part_reader_deactivate (splitmux->parts[cur_part]); + } if (splitmux->play_segment.rate >= 0.0) { if (splitmux->play_segment.stop != -1) { @@ -1244,11 +1315,6 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad) " moving to part %d", splitpad, next_part); splitpad->cur_part = next_part; splitpad->reader = splitmux->parts[splitpad->cur_part]; - if (splitpad->part_pad) - gst_object_unref (splitpad->part_pad); - splitpad->part_pad = - gst_splitmux_part_reader_lookup_pad (splitpad->reader, - (GstPad *) (splitpad)); if (splitmux->cur_part != next_part) { if (!gst_splitmux_part_reader_is_active (splitpad->reader)) { @@ -1264,12 +1330,22 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad) GST_DEBUG_OBJECT (splitpad, "First pad to change part. Activating part %d with seg %" GST_SEGMENT_FORMAT, next_part, &tmp); + add_to_active_readers (splitmux, splitpad->reader); + if (!gst_splitmux_part_reader_activate (splitpad->reader, &tmp, - GST_SEEK_FLAG_NONE)) + GST_SEEK_FLAG_NONE)) { goto error; + } } splitmux->cur_part = next_part; } + + if (splitpad->part_pad) + gst_object_unref (splitpad->part_pad); + splitpad->part_pad = + gst_splitmux_part_reader_lookup_pad (splitpad->reader, + (GstPad *) (splitpad)); + res = TRUE; } @@ -1424,12 +1500,15 @@ splitmux_src_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) splitmux->segment_seqnum = seqnum; /* Work out where to start from now */ - for (i = 0; i < splitmux->num_parts; i++) { - GstSplitMuxPartReader *reader = splitmux->parts[i]; - GstClockTime part_end = - gst_splitmux_part_reader_get_end_offset (reader); + for (i = 0; i < splitmux->num_parts - 1; i++) { + GstSplitMuxPartReader *reader = splitmux->parts[i + 1]; + GstClockTime part_start = + gst_splitmux_part_reader_get_start_offset (reader); - if (part_end > position) + GST_LOG_OBJECT (splitmux, "Part %d has start offset %" GST_TIMEP_FORMAT + " (want position %" GST_TIMEP_FORMAT ")", i, &part_start, + &position); + if (position < part_start) break; } if (i == splitmux->num_parts) diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h index f13ee08729..68698ea2bf 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsrc.h @@ -67,6 +67,9 @@ struct _GstSplitMuxSrc GstClockTime end_offset; GstSegment play_segment; guint32 segment_seqnum; + + guint target_max_readers; /* Maximum number of readers we try to keep open */ + GQueue *active_parts; }; struct _GstSplitMuxSrcClass