splitmuxsrc: Add num-open-fragments property

Add a property to limit the number of parts splitmux will open
simultaneously. Modify the part handling to support deactivating
and reactivating the demuxing for each part.

The default is '0', to preserve the existing behaviour of opening
all parts at the beginning.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7053>
This commit is contained in:
Jan Schmidt 2024-04-30 00:33:35 +10:00
parent eeb5a42b5d
commit 1821b52dd5
4 changed files with 245 additions and 76 deletions

View file

@ -152,7 +152,7 @@ handle_buffer_measuring (GstSplitMuxPartReader * reader,
part_pad->seen_buffer = TRUE;
/* Adjust buffer timestamps */
offset = reader->start_offset + part_pad->segment.base;
offset = reader->info.start_offset + part_pad->segment.base;
offset -= part_pad->initial_ts_offset;
/* We don't add the ts_offset here, because we
* want to measure the logical length of the stream,
@ -245,7 +245,7 @@ splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
}
/* Adjust buffer timestamps */
offset = reader->start_offset + part_pad->segment.base;
offset = reader->info.start_offset + part_pad->segment.base;
offset -= part_pad->initial_ts_offset;
offset += reader->ts_offset;
@ -418,11 +418,11 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
* adding a fixed offset so that DTS is never negative */
if (seg->stop != -1) {
seg->stop -= seg->start;
seg->stop += seg->time + reader->start_offset + reader->ts_offset;
seg->stop += seg->time + reader->info.start_offset + reader->ts_offset;
}
seg->start = seg->time + reader->start_offset + reader->ts_offset;
seg->time += reader->start_offset;
seg->position += reader->start_offset;
seg->start = seg->time + reader->info.start_offset + reader->ts_offset;
seg->time += reader->info.start_offset;
seg->position += reader->info.start_offset;
/* Replace event */
gst_event_unref (event);
@ -448,7 +448,7 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
GST_DEBUG_OBJECT (reader,
"Adjusting segment stop by %" GST_TIME_FORMAT
" output now %" GST_SEGMENT_FORMAT,
GST_TIME_ARGS (reader->start_offset), &target->segment);
GST_TIME_ARGS (reader->info.start_offset), &target->segment);
}
}
GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
@ -684,8 +684,11 @@ gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
{
GstElement *typefind;
reader->prep_state = PART_STATE_NULL;
reader->need_duration_measuring = TRUE;
reader->active = FALSE;
reader->duration = GST_CLOCK_TIME_NONE;
reader->info.duration = GST_CLOCK_TIME_NONE;
g_cond_init (&reader->inactive_cond);
g_mutex_init (&reader->lock);
@ -814,7 +817,6 @@ new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
GstSplitMuxPartReader * reader)
{
GstPad *out_pad = NULL;
GstSplitMuxPartPad *proxy_pad;
GstCaps *caps;
GstPadLinkReturn link_ret;
@ -835,7 +837,8 @@ new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
}
/* Create our proxy pad to interact with this new pad */
proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
GstSplitMuxPartPad *proxy_pad =
gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
GST_DEBUG_OBJECT (reader,
"created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
proxy_pad, out_pad);
@ -914,12 +917,12 @@ gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
SPLITMUX_PART_LOCK (reader);
if (target_seg->start >= reader->start_offset)
start = target_seg->start - reader->start_offset;
if (target_seg->start >= reader->info.start_offset)
start = target_seg->start - reader->info.start_offset;
/* If the segment stop is within this part, don't play to the end */
if (target_seg->stop != -1 &&
target_seg->stop < reader->start_offset + reader->duration)
stop = target_seg->stop - reader->start_offset;
target_seg->stop < reader->info.start_offset + reader->info.duration)
stop = target_seg->stop - reader->info.start_offset;
SPLITMUX_PART_UNLOCK (reader);
@ -942,9 +945,9 @@ gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
* to EOS in order to find the smallest end timestamp to start the next
* file from
*/
if (GST_CLOCK_TIME_IS_VALID (reader->duration)
&& reader->duration > GST_SECOND) {
GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
if (GST_CLOCK_TIME_IS_VALID (reader->info.duration)
&& reader->info.duration > GST_SECOND) {
GstClockTime seek_ts = reader->info.duration - (0.5 * GST_SECOND);
gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
}
SPLITMUX_PART_UNLOCK (reader);
@ -956,17 +959,31 @@ gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
{
SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
GstClockTime end_offset = GST_CLOCK_TIME_NONE;
/* Fire the prepared signal and go to READY state */
GST_DEBUG_OBJECT (reader,
"Stream measuring complete. File %s is now ready", reader->path);
reader->prep_state = PART_STATE_READY;
for (GList * cur = g_list_first (reader->pads); cur != NULL;
cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
GST_WARNING_OBJECT (part_pad,
"Finished measuring. MinTS seen %" GST_TIMEP_FORMAT " MaxTS seen %"
GST_TIMEP_FORMAT, &part_pad->min_ts, &part_pad->max_ts);
if (reader->need_duration_measuring) {
for (GList * cur = g_list_first (reader->pads); cur != NULL;
cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
GST_WARNING_OBJECT (part_pad,
"Finished measuring. MinTS seen %" GST_TIMEP_FORMAT " MaxTS seen %"
GST_TIMEP_FORMAT, &part_pad->min_ts, &part_pad->max_ts);
if (!part_pad->is_sparse && part_pad->max_ts < end_offset) {
end_offset = part_pad->max_ts;
}
}
GST_DEBUG_OBJECT (reader,
"Stream measuring complete. File %s is now ready. End offset %"
GST_TIMEP_FORMAT, reader->path, &end_offset);
reader->end_offset = end_offset;
reader->need_duration_measuring = FALSE; // We won't re-measure this part
}
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
do_async_done (reader);
} else {
@ -1036,12 +1053,21 @@ check_if_pads_collected (GstSplitMuxPartReader * reader)
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
/* Check we have all pads and each pad has seen a buffer */
if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
GST_DEBUG_OBJECT (reader,
"no more pads - file %s. Measuring stream length", reader->path);
reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
NULL, NULL);
if (reader->need_duration_measuring) {
/* Need to measure duration before finishing */
GST_DEBUG_OBJECT (reader,
"no more pads - file %s. Measuring stream length", reader->path);
reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
NULL, NULL);
} else {
reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc)
gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
}
}
}
}
@ -1069,7 +1095,7 @@ no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
}
GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
reader->path, GST_TIME_ARGS (duration));
reader->duration = (GstClockTime) duration;
reader->info.duration = (GstClockTime) duration;
reader->no_more_pads = TRUE;
@ -1114,7 +1140,7 @@ gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
if (fmt != GST_FORMAT_TIME)
return FALSE;
SPLITMUX_PART_LOCK (part);
position += part->start_offset;
position += part->info.start_offset;
GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
GST_TIME_ARGS (position));
SPLITMUX_PART_UNLOCK (part);
@ -1186,6 +1212,7 @@ gst_splitmux_part_reader_change_state (GstElement * element,
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
do_async_done (reader);
splitmux_part_reader_reset (reader);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
SPLITMUX_PART_LOCK (reader);
@ -1196,7 +1223,6 @@ gst_splitmux_part_reader_change_state (GstElement * element,
break;
case GST_STATE_CHANGE_READY_TO_NULL:
reader->prep_state = PART_STATE_NULL;
splitmux_part_reader_reset (reader);
break;
default:
break;
@ -1219,12 +1245,52 @@ gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
return TRUE;
}
static gboolean
gst_splitmux_part_reader_prepare_sync (GstSplitMuxPartReader * reader)
{
GstStateChangeReturn ret;
ret = gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
if (ret == GST_STATE_CHANGE_FAILURE)
return FALSE;
if (ret == GST_STATE_CHANGE_ASYNC) {
SPLITMUX_PART_LOCK (reader);
while (reader->running && reader->prep_state != PART_STATE_READY) {
if (reader->prep_state == PART_STATE_FAILED) {
SPLITMUX_PART_UNLOCK (reader);
return FALSE;
}
GST_LOG_OBJECT (reader,
"Waiting for prepare (or failure) on reader %s", reader->path);
SPLITMUX_PART_WAIT (reader);
}
SPLITMUX_PART_UNLOCK (reader);
}
return TRUE;
}
void
gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
{
gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
}
gboolean
gst_splitmux_part_reader_is_running (GstSplitMuxPartReader * part)
{
gboolean ret;
SPLITMUX_PART_LOCK (part);
ret = part->running;
SPLITMUX_PART_UNLOCK (part);
return ret;
}
void
gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
const gchar * path)
@ -1238,6 +1304,11 @@ gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
{
GST_DEBUG_OBJECT (reader, "Activating part reader");
if (!gst_splitmux_part_reader_prepare_sync (reader)) {
GST_ERROR_OBJECT (reader, "Failed to prepare part before activation");
return FALSE;
}
if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
seg);
@ -1270,6 +1341,13 @@ gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
}
void
gst_splitmux_part_reader_stop (GstSplitMuxPartReader * reader)
{
GST_DEBUG_OBJECT (reader, "Stopping reader tasks");
gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_READY);
}
void
gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
gboolean flushing)
@ -1297,16 +1375,10 @@ gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
GstClockTime
gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
{
GList *cur;
GstClockTime ret = GST_CLOCK_TIME_NONE;
SPLITMUX_PART_LOCK (reader);
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (!part_pad->is_sparse && part_pad->max_ts < ret)
ret = part_pad->max_ts;
}
ret = reader->end_offset;
SPLITMUX_PART_UNLOCK (reader);
return ret;
@ -1317,7 +1389,7 @@ gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
GstClockTime time_offset, GstClockTime ts_offset)
{
SPLITMUX_PART_LOCK (reader);
reader->start_offset = time_offset;
reader->info.start_offset = time_offset;
reader->ts_offset = ts_offset;
GST_INFO_OBJECT (reader, "Time offset now %" GST_TIME_FORMAT,
GST_TIME_ARGS (time_offset));
@ -1330,7 +1402,7 @@ gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
GstClockTime ret = GST_CLOCK_TIME_NONE;
SPLITMUX_PART_LOCK (reader);
ret = reader->start_offset;
ret = reader->info.start_offset;
SPLITMUX_PART_UNLOCK (reader);
return ret;
@ -1342,7 +1414,7 @@ gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
GstClockTime dur;
SPLITMUX_PART_LOCK (reader);
dur = reader->duration;
dur = reader->info.duration;
SPLITMUX_PART_UNLOCK (reader);
return dur;
@ -1388,6 +1460,7 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
/* Have to drop the lock around pop, so we can be woken up for flush */
SPLITMUX_PART_UNLOCK (reader);
if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
GST_LOG_OBJECT (part_pad, "Popped null item -> flushing");
ret = GST_FLOW_FLUSHING;
goto out;
}
@ -1398,8 +1471,10 @@ gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
if (GST_IS_EVENT ((*item)->object)) {
GstEvent *e = (GstEvent *) ((*item)->object);
/* Mark this pad as EOS */
if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
if (GST_EVENT_TYPE (e) == GST_EVENT_EOS) {
GST_LOG_OBJECT (part_pad, "popping EOS event");
part_pad->is_eos = TRUE;
}
}
SPLITMUX_PART_UNLOCK (reader);

View file

@ -37,6 +37,7 @@ G_BEGIN_DECLS
typedef struct _GstSplitMuxPartReader GstSplitMuxPartReader;
typedef struct _GstSplitMuxPartReaderClass GstSplitMuxPartReaderClass;
typedef struct _GstSplitMuxPartReaderInfo GstSplitMuxPartReaderInfo;
typedef struct _SplitMuxSrcPad SplitMuxSrcPad;
typedef struct _SplitMuxSrcPadClass SplitMuxSrcPadClass;
@ -52,11 +53,18 @@ typedef enum
typedef GstPad *(*GstSplitMuxPartReaderPadCb)(GstSplitMuxPartReader *reader, GstPad *src_pad, gpointer cb_data);
struct _GstSplitMuxPartReaderInfo
{
GstClockTime duration;
GstClockTime start_offset;
};
struct _GstSplitMuxPartReader
{
GstPipeline parent;
GstSplitMuxPartState prep_state;
gboolean need_duration_measuring;
gchar *path;
@ -71,9 +79,10 @@ struct _GstSplitMuxPartReader
gboolean flushing;
gboolean no_more_pads;
GstClockTime duration;
GstClockTime start_offset;
GstSplitMuxPartReaderInfo info;
GstClockTime ts_offset;
GstClockTime end_offset;
GList *pads;
@ -100,6 +109,7 @@ void gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader *reader,
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb);
gboolean gst_splitmux_part_reader_prepare (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader *part);
gboolean gst_splitmux_part_reader_is_running (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader *reader,
const gchar *path);
gboolean gst_splitmux_part_is_eos (GstSplitMuxPartReader *reader);
@ -108,6 +118,8 @@ gboolean gst_splitmux_part_reader_activate (GstSplitMuxPartReader *part, GstSegm
void gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader *part);
gboolean gst_splitmux_part_reader_is_active (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_stop (GstSplitMuxPartReader *part);
gboolean gst_splitmux_part_reader_src_query (GstSplitMuxPartReader *part, GstPad *src_pad, GstQuery * query);
void gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader *part, GstClockTime time_offset, GstClockTime ts_offset);
GstClockTime gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader *part);

View file

@ -60,9 +60,12 @@ GST_DEBUG_CATEGORY (splitmux_debug);
enum
{
PROP_0,
PROP_LOCATION
PROP_LOCATION,
PROP_NUM_OPEN_FRAGMENTS
};
#define DEFAULT_OPEN_FRAGMENTS 0
enum
{
SIGNAL_FORMAT_LOCATION,
@ -234,6 +237,13 @@ gst_splitmux_src_class_init (GstSplitMuxSrcClass * klass)
g_param_spec_string ("location", "File Input Pattern",
"Glob pattern for the location of the files to read", NULL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_OPEN_FRAGMENTS,
g_param_spec_uint ("num-open-fragments", "Open files limit",
"Number of files to keep open simultaneously. "
"(0 = open all fragments at the start). "
"May still use slightly more if set to less than the number of streams in the files",
0, G_MAXUINT, DEFAULT_OPEN_FRAGMENTS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSplitMuxSrc::format-location:
@ -257,6 +267,7 @@ gst_splitmux_src_init (GstSplitMuxSrc * splitmux)
g_rw_lock_init (&splitmux->pads_rwlock);
splitmux->total_duration = GST_CLOCK_TIME_NONE;
gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME);
splitmux->target_max_readers = DEFAULT_OPEN_FRAGMENTS;
}
static void
@ -305,6 +316,11 @@ gst_splitmux_src_set_property (GObject * object, guint prop_id,
GST_OBJECT_UNLOCK (splitmux);
break;
}
case PROP_NUM_OPEN_FRAGMENTS:
GST_OBJECT_LOCK (splitmux);
splitmux->target_max_readers = g_value_get_uint (value);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -323,6 +339,11 @@ gst_splitmux_src_get_property (GObject * object, guint prop_id,
g_value_set_string (value, splitmux->location);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_NUM_OPEN_FRAGMENTS:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint (value, splitmux->target_max_readers);
GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -433,9 +454,6 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
gboolean need_no_more_pads;
if (idx >= splitmux->num_parts) {
/* Shouldn't really happen! */
do_async_done (splitmux);
g_warn_if_reached ();
break;
}
@ -544,7 +562,7 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
}
static GstSplitMuxPartReader *
gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename)
gst_splitmux_part_reader_create (GstSplitMuxSrc * splitmux, char *filename)
{
GstSplitMuxPartReader *r;
GstBus *bus;
@ -830,20 +848,69 @@ flushing:
return;
}
static void
reduce_active_readers (GstSplitMuxSrc * splitmux)
{
/* Try and reduce the active reader count by deactivating the
* oldest reader if it's no longer in use */
if (splitmux->target_max_readers == 0) {
return;
}
while (g_queue_get_length (splitmux->active_parts) >=
splitmux->target_max_readers) {
GstSplitMuxPartReader *oldest_reader =
g_queue_peek_head (splitmux->active_parts);
if (gst_splitmux_part_reader_is_active (oldest_reader)) {
return;
}
GST_DEBUG_OBJECT (splitmux, "Stopping least recently used part %s",
oldest_reader->path);
oldest_reader = g_queue_pop_head (splitmux->active_parts);
gst_splitmux_part_reader_stop (oldest_reader);
g_object_unref (oldest_reader);
}
}
static void
add_to_active_readers (GstSplitMuxSrc * splitmux,
GstSplitMuxPartReader * reader)
{
if (splitmux->target_max_readers != 0) {
/* Check if it's already in the active reader pool, and move this reader
* to the tail, or else add a ref and push it on the tail */
if (gst_splitmux_part_reader_is_running (reader)) {
/* Already in the queue, and reffed, move it to the end without
* adding another ref */
gboolean in_queue = g_queue_remove (splitmux->active_parts, reader);
g_assert (in_queue == TRUE);
} else {
/* Putting it in the queue. Add a ref */
g_object_ref (reader);
/* When adding a new reader to the list, reduce active readers first */
reduce_active_readers (splitmux);
}
g_queue_push_tail (splitmux->active_parts, reader);
}
}
static gboolean
gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
GstSeekFlags extra_flags)
{
GList *cur;
GST_DEBUG_OBJECT (splitmux, "Activating part %d", part);
GstSplitMuxPartReader *reader = splitmux->parts[part];
splitmux->cur_part = part;
if (!gst_splitmux_part_reader_activate (splitmux->parts[part],
&splitmux->play_segment, extra_flags))
add_to_active_readers (splitmux, reader);
if (!gst_splitmux_part_reader_activate (reader,
&splitmux->play_segment, extra_flags)) {
return FALSE;
}
SPLITMUX_SRC_PADS_RLOCK (splitmux);
GList *cur;
for (cur = g_list_first (splitmux->pads);
cur != NULL; cur = g_list_next (cur)) {
SplitMuxSrcPad *splitpad = (SplitMuxSrcPad *) (cur->data);
@ -879,6 +946,8 @@ gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux)
GST_DEBUG_OBJECT (splitmux, "Preparing file part %s (%u)",
splitmux->parts[idx]->path, idx);
add_to_active_readers (splitmux, splitmux->parts[idx]);
gst_splitmux_part_reader_set_start_offset (splitmux->parts[idx],
splitmux->end_offset, FIXED_TS_OFFSET);
if (!gst_splitmux_part_reader_prepare (splitmux->parts[idx])) {
@ -939,13 +1008,15 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
splitmux->running = TRUE;
SPLITMUX_SRC_UNLOCK (splitmux);
splitmux->active_parts = g_queue_new ();
splitmux->num_parts = g_strv_length (files);
splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts);
/* Create all part pipelines */
for (i = 0; i < splitmux->num_parts; i++) {
splitmux->parts[i] = gst_splitmux_part_create (splitmux, files[i]);
splitmux->parts[i] = gst_splitmux_part_reader_create (splitmux, files[i]);
if (splitmux->parts[i] == NULL)
break;
}
@ -1008,16 +1079,12 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
splitmux->running = FALSE;
GST_DEBUG_OBJECT (splitmux, "Stopping");
SPLITMUX_SRC_UNLOCK (splitmux);
/* Stop all part readers. We don't need the lock here,
* because all parts were created in _start() */
/* Stop all part readers. */
for (i = 0; i < splitmux->num_created_parts; i++) {
if (splitmux->parts[i] == NULL)
continue;
gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
}
SPLITMUX_SRC_LOCK (splitmux);
SPLITMUX_SRC_PADS_WLOCK (splitmux);
pads_list = splitmux->pads;
@ -1034,6 +1101,8 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
SPLITMUX_SRC_LOCK (splitmux);
/* Now the pad task is stopped we can destroy the readers */
g_queue_free_full (splitmux->active_parts, g_object_unref);
for (i = 0; i < splitmux->num_created_parts; i++) {
if (splitmux->parts[i] == NULL)
continue;
@ -1047,6 +1116,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
splitmux->num_prepared_parts = 0;
splitmux->num_created_parts = 0;
splitmux->total_duration = GST_CLOCK_TIME_NONE;
/* Reset playback segment */
gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME);
out:
@ -1087,7 +1157,6 @@ gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, GstPad * pad,
GstPad *target = NULL;
gboolean is_new_pad = FALSE;
SPLITMUX_SRC_LOCK (splitmux);
SPLITMUX_SRC_PADS_WLOCK (splitmux);
for (cur = g_list_first (splitmux->pads);
cur != NULL; cur = g_list_next (cur)) {
@ -1116,7 +1185,6 @@ gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, GstPad * pad,
is_new_pad = TRUE;
}
SPLITMUX_SRC_PADS_WUNLOCK (splitmux);
SPLITMUX_SRC_UNLOCK (splitmux);
g_free (pad_name);
@ -1212,8 +1280,11 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad)
SPLITMUX_SRC_LOCK (splitmux);
/* If all pads are done with this part, deactivate it */
if (gst_splitmux_part_is_eos (splitmux->parts[splitpad->cur_part]))
if (gst_splitmux_part_is_eos (splitmux->parts[splitpad->cur_part])) {
GST_DEBUG_OBJECT (splitmux, "All pads in part %d finished. Deactivating it",
cur_part);
gst_splitmux_part_reader_deactivate (splitmux->parts[cur_part]);
}
if (splitmux->play_segment.rate >= 0.0) {
if (splitmux->play_segment.stop != -1) {
@ -1244,11 +1315,6 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad)
" moving to part %d", splitpad, next_part);
splitpad->cur_part = next_part;
splitpad->reader = splitmux->parts[splitpad->cur_part];
if (splitpad->part_pad)
gst_object_unref (splitpad->part_pad);
splitpad->part_pad =
gst_splitmux_part_reader_lookup_pad (splitpad->reader,
(GstPad *) (splitpad));
if (splitmux->cur_part != next_part) {
if (!gst_splitmux_part_reader_is_active (splitpad->reader)) {
@ -1264,12 +1330,22 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad)
GST_DEBUG_OBJECT (splitpad,
"First pad to change part. Activating part %d with seg %"
GST_SEGMENT_FORMAT, next_part, &tmp);
add_to_active_readers (splitmux, splitpad->reader);
if (!gst_splitmux_part_reader_activate (splitpad->reader, &tmp,
GST_SEEK_FLAG_NONE))
GST_SEEK_FLAG_NONE)) {
goto error;
}
}
splitmux->cur_part = next_part;
}
if (splitpad->part_pad)
gst_object_unref (splitpad->part_pad);
splitpad->part_pad =
gst_splitmux_part_reader_lookup_pad (splitpad->reader,
(GstPad *) (splitpad));
res = TRUE;
}
@ -1424,12 +1500,15 @@ splitmux_src_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
splitmux->segment_seqnum = seqnum;
/* Work out where to start from now */
for (i = 0; i < splitmux->num_parts; i++) {
GstSplitMuxPartReader *reader = splitmux->parts[i];
GstClockTime part_end =
gst_splitmux_part_reader_get_end_offset (reader);
for (i = 0; i < splitmux->num_parts - 1; i++) {
GstSplitMuxPartReader *reader = splitmux->parts[i + 1];
GstClockTime part_start =
gst_splitmux_part_reader_get_start_offset (reader);
if (part_end > position)
GST_LOG_OBJECT (splitmux, "Part %d has start offset %" GST_TIMEP_FORMAT
" (want position %" GST_TIMEP_FORMAT ")", i, &part_start,
&position);
if (position < part_start)
break;
}
if (i == splitmux->num_parts)

View file

@ -67,6 +67,9 @@ struct _GstSplitMuxSrc
GstClockTime end_offset;
GstSegment play_segment;
guint32 segment_seqnum;
guint target_max_readers; /* Maximum number of readers we try to keep open */
GQueue *active_parts;
};
struct _GstSplitMuxSrcClass