splitmuxsrc: Add add-fragment signal and examples

Add a signal that allows adding fragments with a specific offset
and duration directly to splitmuxsrc's list. By providing the
fragment's offset on the playback timeline and duration directly,
splitmuxsrc doesn't need to measure the fragment making for faster
startup times.

Add a bus message that's published when fragments are measured,
reporting the offset and duration, so they can be cached by an
application and used on future invocations.

Add examples for handling the bus message and using the 'add-fragment'
signal.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7053>
This commit is contained in:
Jan Schmidt 2024-05-03 23:41:03 +10:00
parent 1821b52dd5
commit 682db96a41
9 changed files with 604 additions and 129 deletions

View file

@ -312,6 +312,17 @@ splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
return TRUE;
}
gboolean
gst_splitmux_part_reader_needs_measuring (GstSplitMuxPartReader * reader)
{
gboolean res;
SPLITMUX_PART_LOCK (reader);
res = reader->need_duration_measuring;
SPLITMUX_PART_UNLOCK (reader);
return res;
}
gboolean
gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
@ -688,6 +699,7 @@ gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
reader->need_duration_measuring = TRUE;
reader->active = FALSE;
reader->info.start_offset = GST_CLOCK_TIME_NONE;
reader->info.duration = GST_CLOCK_TIME_NONE;
g_cond_init (&reader->inactive_cond);
@ -960,6 +972,8 @@ gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
GstClockTime end_offset = GST_CLOCK_TIME_NONE;
gboolean done_measuring = FALSE;
GstSplitMuxPartReaderInfo info;
/* Fire the prepared signal and go to READY state */
reader->prep_state = PART_STATE_READY;
@ -981,10 +995,17 @@ gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
reader->end_offset = end_offset;
reader->need_duration_measuring = FALSE; // We won't re-measure this part
info = reader->info;
done_measuring = TRUE;
}
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
if (done_measuring && reader->measured_cb) {
reader->measured_cb (reader, reader->path, info.start_offset,
info.duration, reader->cb_data);
}
do_async_done (reader);
} else {
SPLITMUX_PART_UNLOCK (reader);
@ -1366,10 +1387,12 @@ gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
void
gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb,
GstSplitMuxPartReaderMeasuredCb measured_cb)
{
reader->cb_data = cb_data;
reader->get_pad_cb = get_pad_cb;
reader->measured_cb = measured_cb;
}
GstClockTime
@ -1396,6 +1419,19 @@ gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
SPLITMUX_PART_UNLOCK (reader);
}
void
gst_splitmux_part_reader_set_duration (GstSplitMuxPartReader * reader,
GstClockTime duration)
{
SPLITMUX_PART_LOCK (reader);
reader->info.duration = duration;
reader->need_duration_measuring = (duration == GST_CLOCK_TIME_NONE);
GST_INFO_OBJECT (reader, "Duration manually set to %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration));
SPLITMUX_PART_UNLOCK (reader);
}
GstClockTime
gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
{

View file

@ -52,6 +52,7 @@ typedef enum
} GstSplitMuxPartState;
typedef GstPad *(*GstSplitMuxPartReaderPadCb)(GstSplitMuxPartReader *reader, GstPad *src_pad, gpointer cb_data);
typedef void (*GstSplitMuxPartReaderMeasuredCb)(GstSplitMuxPartReader *reader, const gchar *filename, GstClockTime offset, GstClockTime duration, gpointer cb_data);
struct _GstSplitMuxPartReaderInfo
{
@ -92,6 +93,7 @@ struct _GstSplitMuxPartReader
GMutex msg_lock;
GstSplitMuxPartReaderPadCb get_pad_cb;
GstSplitMuxPartReaderMeasuredCb measured_cb;
gpointer cb_data;
};
@ -106,12 +108,13 @@ struct _GstSplitMuxPartReaderClass
GType gst_splitmux_part_reader_get_type (void);
void gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader *reader,
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb);
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb, GstSplitMuxPartReaderMeasuredCb measured_cb);
gboolean gst_splitmux_part_reader_prepare (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader *part);
gboolean gst_splitmux_part_reader_is_running (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader *reader,
const gchar *path);
gboolean gst_splitmux_part_reader_needs_measuring (GstSplitMuxPartReader *reader);
gboolean gst_splitmux_part_is_eos (GstSplitMuxPartReader *reader);
gboolean gst_splitmux_part_reader_activate (GstSplitMuxPartReader *part, GstSegment *seg, GstSeekFlags extra_flags);
@ -122,6 +125,7 @@ void gst_splitmux_part_reader_stop (GstSplitMuxPartReader *part);
gboolean gst_splitmux_part_reader_src_query (GstSplitMuxPartReader *part, GstPad *src_pad, GstQuery * query);
void gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader *part, GstClockTime time_offset, GstClockTime ts_offset);
void gst_splitmux_part_reader_set_duration (GstSplitMuxPartReader *part, GstClockTime duration);
GstClockTime gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader *part);
GstClockTime gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader *part);
GstClockTime gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader);

View file

@ -69,6 +69,7 @@ enum
enum
{
SIGNAL_FORMAT_LOCATION,
SIGNAL_ADD_FRAGMENT,
SIGNAL_LAST
};
@ -117,16 +118,23 @@ static void splitmux_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static void
gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part,
const gchar * filename, GstClockTime offset, GstClockTime duration,
GstSplitMuxSrc * splitmux);
static GstPad *gst_splitmux_find_output_pad (GstSplitMuxPartReader * part,
GstPad * pad, GstSplitMuxSrc * splitmux);
static gboolean gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux,
SplitMuxSrcPad * pad);
static gboolean gst_splitmux_check_new_caps (SplitMuxSrcPad * splitpad,
GstEvent * event);
static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
static gboolean gst_splitmux_src_measure_next_part (GstSplitMuxSrc * splitmux);
static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
guint part, GstSeekFlags extra_flags);
static gboolean gst_splitmuxsrc_add_fragment (GstSplitMuxSrc * splitmux,
const gchar * filename, GstClockTime offset, GstClockTime duration);
#define _do_init \
G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init); \
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsrc", 0, "Split File Demuxing Source");
@ -258,6 +266,31 @@ gst_splitmux_src_class_init (GstSplitMuxSrcClass * klass)
signals[SIGNAL_FORMAT_LOCATION] =
g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRV, 0);
/**
* GstSplitMuxSrc::add-fragment:
* @splitmux: the #GstSplitMuxSrc
* @filename: The fragment filename to add
* @offset: Playback offset for the fragment (can be #GST_CLOCK_TIME_NONE)
* @duration: Fragment nominal duration (can be #GST_CLOCK_TIME_NONE)
*
* Add a file fragment to the set of parts. If the offset and duration are provided,
* the file will be placed in the set immediately without loading the file to measure
* it.
*
* Returns: A boolean. TRUE if the fragment was successfully appended.
* FALSE on failure.
*
* Since: 1.24
*/
signals[SIGNAL_ADD_FRAGMENT] =
g_signal_new_class_handler ("add-fragment",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_splitmuxsrc_add_fragment),
NULL, NULL, NULL,
G_TYPE_BOOLEAN, 3, G_TYPE_STRING, GST_TYPE_CLOCK_TIME,
GST_TYPE_CLOCK_TIME);
}
static void
@ -434,6 +467,8 @@ gst_splitmux_src_activate_first_part (GstSplitMuxSrc * splitmux)
{
SPLITMUX_SRC_LOCK (splitmux);
if (splitmux->running) {
do_async_done (splitmux);
if (!gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE)) {
GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
("Failed to activate first part for playback"));
@ -442,6 +477,76 @@ gst_splitmux_src_activate_first_part (GstSplitMuxSrc * splitmux)
SPLITMUX_SRC_UNLOCK (splitmux);
}
static void
gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part,
const gchar * filename, GstClockTime offset, GstClockTime duration,
GstSplitMuxSrc * splitmux)
{
guint idx = splitmux->num_measured_parts;
gboolean need_no_more_pads;
/* signal no-more-pads as we have all pads at this point now */
SPLITMUX_SRC_LOCK (splitmux);
need_no_more_pads = !splitmux->pads_complete;
splitmux->pads_complete = TRUE;
SPLITMUX_SRC_UNLOCK (splitmux);
if (need_no_more_pads) {
GST_DEBUG_OBJECT (splitmux, "Signalling no-more-pads");
gst_element_no_more_pads (GST_ELEMENT_CAST (splitmux));
}
if (idx >= splitmux->num_parts) {
return;
}
GST_DEBUG_OBJECT (splitmux, "Measured file part %s (%u)",
splitmux->parts[idx]->path, idx);
/* Post part measured info message */
GstMessage *msg = gst_message_new_element (GST_OBJECT (splitmux),
gst_structure_new ("splitmuxsrc-fragment-info",
"location", G_TYPE_STRING, filename,
"fragment-offset", GST_TYPE_CLOCK_TIME, offset,
"fragment-duration", GST_TYPE_CLOCK_TIME, duration,
NULL));
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
/* Extend our total duration to cover this part */
GST_OBJECT_LOCK (splitmux);
splitmux->total_duration +=
gst_splitmux_part_reader_get_duration (splitmux->parts[idx]);
splitmux->play_segment.duration = splitmux->total_duration;
GST_OBJECT_UNLOCK (splitmux);
splitmux->end_offset =
gst_splitmux_part_reader_get_end_offset (splitmux->parts[idx]);
GST_DEBUG_OBJECT (splitmux,
"Duration %" GST_TIME_FORMAT ", total duration now: %" GST_TIME_FORMAT
" and end offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (gst_splitmux_part_reader_get_duration (splitmux->parts
[idx])), GST_TIME_ARGS (splitmux->total_duration),
GST_TIME_ARGS (splitmux->end_offset));
splitmux->num_measured_parts++;
/* If we're done or preparing the next part fails, finish here */
if (splitmux->num_measured_parts >= splitmux->num_parts
|| !gst_splitmux_src_measure_next_part (splitmux)) {
/* Store how many parts we actually prepared in the end */
splitmux->num_parts = splitmux->num_measured_parts;
/* All done preparing, activate the first part */
GST_INFO_OBJECT (splitmux,
"All parts measured. Total duration %" GST_TIME_FORMAT
" Activating first part", GST_TIME_ARGS (splitmux->total_duration));
gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
NULL, NULL);
}
}
static GstBusSyncReply
gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
gpointer user_data)
@ -450,70 +555,14 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_ASYNC_DONE:{
guint idx = splitmux->num_prepared_parts;
gboolean need_no_more_pads;
if (idx >= splitmux->num_parts) {
break;
}
GST_DEBUG_OBJECT (splitmux, "Prepared file part %s (%u)",
splitmux->parts[idx]->path, idx);
/* signal no-more-pads as we have all pads at this point now */
SPLITMUX_SRC_LOCK (splitmux);
need_no_more_pads = !splitmux->pads_complete;
splitmux->pads_complete = TRUE;
SPLITMUX_SRC_UNLOCK (splitmux);
if (need_no_more_pads) {
GST_DEBUG_OBJECT (splitmux, "Signalling no-more-pads");
gst_element_no_more_pads (GST_ELEMENT_CAST (splitmux));
}
/* Extend our total duration to cover this part */
GST_OBJECT_LOCK (splitmux);
splitmux->total_duration +=
gst_splitmux_part_reader_get_duration (splitmux->parts[idx]);
splitmux->play_segment.duration = splitmux->total_duration;
GST_OBJECT_UNLOCK (splitmux);
splitmux->end_offset =
gst_splitmux_part_reader_get_end_offset (splitmux->parts[idx]);
GST_DEBUG_OBJECT (splitmux,
"Duration %" GST_TIME_FORMAT ", total duration now: %" GST_TIME_FORMAT
" and end offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (gst_splitmux_part_reader_get_duration (splitmux->parts
[idx])), GST_TIME_ARGS (splitmux->total_duration),
GST_TIME_ARGS (splitmux->end_offset));
splitmux->num_prepared_parts++;
/* If we're done or preparing the next part fails, finish here */
if (splitmux->num_prepared_parts >= splitmux->num_parts
|| !gst_splitmux_src_prepare_next_part (splitmux)) {
/* Store how many parts we actually prepared in the end */
splitmux->num_parts = splitmux->num_prepared_parts;
do_async_done (splitmux);
/* All done preparing, activate the first part */
GST_INFO_OBJECT (splitmux,
"All parts prepared. Total duration %" GST_TIME_FORMAT
" Activating first part", GST_TIME_ARGS (splitmux->total_duration));
gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
NULL, NULL);
}
break;
}
case GST_MESSAGE_ERROR:{
GST_ERROR_OBJECT (splitmux,
"Got error message from part %" GST_PTR_FORMAT ": %" GST_PTR_FORMAT,
GST_MESSAGE_SRC (msg), msg);
if (splitmux->num_prepared_parts < splitmux->num_parts) {
guint idx = splitmux->num_prepared_parts;
if (splitmux->num_measured_parts < splitmux->num_parts) {
guint idx = splitmux->num_measured_parts;
if (idx == 0) {
GST_ERROR_OBJECT (splitmux,
@ -532,7 +581,7 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
}
/* Store how many parts we actually prepared in the end */
splitmux->num_parts = splitmux->num_prepared_parts;
splitmux->num_parts = splitmux->num_measured_parts;
do_async_done (splitmux);
if (idx > 0) {
@ -562,7 +611,8 @@ gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
}
static GstSplitMuxPartReader *
gst_splitmux_part_reader_create (GstSplitMuxSrc * splitmux, char *filename)
gst_splitmux_part_reader_create (GstSplitMuxSrc * splitmux,
const char *filename, gsize index)
{
GstSplitMuxPartReader *r;
GstBus *bus;
@ -570,7 +620,8 @@ gst_splitmux_part_reader_create (GstSplitMuxSrc * splitmux, char *filename)
r = g_object_new (GST_TYPE_SPLITMUX_PART_READER, NULL);
gst_splitmux_part_reader_set_callbacks (r, splitmux,
(GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad);
(GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad,
(GstSplitMuxPartReaderMeasuredCb) gst_splitmux_part_measured_cb);
gst_splitmux_part_reader_set_location (r, filename);
bus = gst_element_get_bus (GST_ELEMENT_CAST (r));
@ -671,7 +722,7 @@ gst_splitmux_handle_event (GstSplitMuxSrc * splitmux,
if (splitmux->play_segment.stop != -1)
seg.stop = splitmux->play_segment.stop + FIXED_TS_OFFSET;
else
seg.stop = splitpad->segment.stop;
seg.stop = -1;
} else {
/* Reverse playback from stop time to start time */
/* See if an end point was requested in the seek */
@ -899,15 +950,22 @@ gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
GstSeekFlags extra_flags)
{
GST_DEBUG_OBJECT (splitmux, "Activating part %d", part);
GstSplitMuxPartReader *reader = splitmux->parts[part];
GstSplitMuxPartReader *reader = gst_object_ref (splitmux->parts[part]);
splitmux->cur_part = part;
add_to_active_readers (splitmux, reader);
SPLITMUX_SRC_UNLOCK (splitmux);
/* Drop lock around calling activate, as it might call back
* into the splitmuxsrc when exposing pads */
if (!gst_splitmux_part_reader_activate (reader,
&splitmux->play_segment, extra_flags)) {
gst_object_unref (reader);
return FALSE;
}
gst_object_unref (reader);
SPLITMUX_SRC_LOCK (splitmux);
SPLITMUX_SRC_PADS_RLOCK (splitmux);
GList *cur;
@ -937,30 +995,66 @@ gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
}
static gboolean
gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux)
gst_splitmux_src_measure_next_part (GstSplitMuxSrc * splitmux)
{
guint idx = splitmux->num_prepared_parts;
guint idx = splitmux->num_measured_parts;
g_assert (idx < splitmux->num_parts);
GST_DEBUG_OBJECT (splitmux, "Preparing file part %s (%u)",
splitmux->parts[idx]->path, idx);
GstClockTime end_offset = 0;
/* Take the end offset of the most recently measured part */
if (splitmux->num_measured_parts > 0) {
GstSplitMuxPartReader *reader =
splitmux->parts[splitmux->num_measured_parts - 1];
end_offset = gst_splitmux_part_reader_get_end_offset (reader);
}
add_to_active_readers (splitmux, splitmux->parts[idx]);
for (guint idx = splitmux->num_measured_parts; idx < splitmux->num_parts;
idx++) {
/* Walk forward until we find a part that needs measuring */
GstSplitMuxPartReader *reader = splitmux->parts[idx];
gst_splitmux_part_reader_set_start_offset (splitmux->parts[idx],
splitmux->end_offset, FIXED_TS_OFFSET);
if (!gst_splitmux_part_reader_prepare (splitmux->parts[idx])) {
GST_WARNING_OBJECT (splitmux,
"Failed to prepare file part %s. Cannot play past there.",
splitmux->parts[idx]->path);
GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
("Failed to prepare file part %s. Cannot play past there.",
splitmux->parts[idx]->path));
gst_splitmux_part_reader_unprepare (splitmux->parts[idx]);
g_object_unref (splitmux->parts[idx]);
splitmux->parts[idx] = NULL;
return FALSE;
GstClockTime start_offset =
gst_splitmux_part_reader_get_start_offset (reader);
if (start_offset == GST_CLOCK_TIME_NONE) {
GST_DEBUG_OBJECT (splitmux,
"Setting start offset for file part %s (%u) to %" GST_TIMEP_FORMAT,
reader->path, idx, &end_offset);
gst_splitmux_part_reader_set_start_offset (reader, end_offset,
FIXED_TS_OFFSET);
}
if (gst_splitmux_part_reader_needs_measuring (reader)) {
GST_DEBUG_OBJECT (splitmux, "Measuring file part %s (%u)",
reader->path, idx);
add_to_active_readers (splitmux, reader);
SPLITMUX_SRC_UNLOCK (splitmux);
if (!gst_splitmux_part_reader_prepare (reader)) {
GST_WARNING_OBJECT (splitmux,
"Failed to prepare file part %s. Cannot play past there.",
reader->path);
GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
("Failed to prepare file part %s. Cannot play past there.",
reader->path));
gst_splitmux_part_reader_unprepare (reader);
g_object_unref (reader);
SPLITMUX_SRC_LOCK (splitmux);
splitmux->parts[idx] = NULL;
splitmux->num_measured_parts = idx;
return FALSE;
}
SPLITMUX_SRC_LOCK (splitmux);
return TRUE;
}
/* Get the end offset (start offset of the next piece) */
end_offset = gst_splitmux_part_reader_get_end_offset (reader);
splitmux->total_duration += gst_splitmux_part_reader_get_duration (reader);
splitmux->num_measured_parts++;
}
return TRUE;
@ -973,7 +1067,7 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
GError *err = NULL;
gchar *basename = NULL;
gchar *dirname = NULL;
gchar **files;
gchar **files = NULL;
guint i;
SPLITMUX_SRC_LOCK (splitmux);
@ -982,48 +1076,55 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
SPLITMUX_SRC_UNLOCK (splitmux);
return FALSE;
}
SPLITMUX_SRC_UNLOCK (splitmux);
GST_DEBUG_OBJECT (splitmux, "Starting");
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, &files);
if (files == NULL || *files == NULL) {
GST_OBJECT_LOCK (splitmux);
if (splitmux->location != NULL && splitmux->location[0] != '\0') {
basename = g_path_get_basename (splitmux->location);
dirname = g_path_get_dirname (splitmux->location);
}
GST_OBJECT_UNLOCK (splitmux);
g_strfreev (files);
files = gst_split_util_find_files (dirname, basename, &err);
if (files == NULL || *files == NULL)
goto no_files;
}
SPLITMUX_SRC_LOCK (splitmux);
splitmux->pads_complete = FALSE;
splitmux->running = TRUE;
SPLITMUX_SRC_UNLOCK (splitmux);
splitmux->active_parts = g_queue_new ();
splitmux->num_parts = g_strv_length (files);
if (splitmux->num_parts == 0) {
/* No parts were added via add-fragment signal, try via
* format-location signal and location property glob */
g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, &files);
splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts);
if (files == NULL || *files == NULL) {
GST_OBJECT_LOCK (splitmux);
if (splitmux->location != NULL && splitmux->location[0] != '\0') {
basename = g_path_get_basename (splitmux->location);
dirname = g_path_get_dirname (splitmux->location);
}
GST_OBJECT_UNLOCK (splitmux);
/* Create all part pipelines */
for (i = 0; i < splitmux->num_parts; i++) {
splitmux->parts[i] = gst_splitmux_part_reader_create (splitmux, files[i]);
if (splitmux->parts[i] == NULL)
break;
g_strfreev (files);
files = gst_split_util_find_files (dirname, basename, &err);
if (files == NULL || *files == NULL) {
SPLITMUX_SRC_UNLOCK (splitmux);
goto no_files;
}
}
}
/* Store how many parts we actually created */
splitmux->num_created_parts = splitmux->num_parts = i;
splitmux->num_prepared_parts = 0;
splitmux->pads_complete = FALSE;
splitmux->running = TRUE;
if (files != NULL) {
g_assert (splitmux->parts == NULL);
splitmux->num_parts_alloced = g_strv_length (files);
splitmux->parts =
g_new0 (GstSplitMuxPartReader *, splitmux->num_parts_alloced);
/* Create all part pipelines */
for (i = 0; i < splitmux->num_parts_alloced; i++) {
splitmux->parts[i] =
gst_splitmux_part_reader_create (splitmux, files[i], i);
if (splitmux->parts[i] == NULL)
break;
}
/* Store how many parts we actually created */
splitmux->num_parts = i;
}
splitmux->num_measured_parts = 0;
/* Update total_duration state variable */
GST_OBJECT_LOCK (splitmux);
@ -1031,11 +1132,24 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
splitmux->end_offset = 0;
GST_OBJECT_UNLOCK (splitmux);
/* Then start the first: it will asynchronously go to PAUSED
/* Ensure all the parts we have are measured.
* Start the first: it will asynchronously go to PAUSED
* or error out and then we can proceed with the next one
*/
if (!gst_splitmux_src_prepare_next_part (splitmux) || splitmux->num_parts < 1)
if (!gst_splitmux_src_measure_next_part (splitmux) || splitmux->num_parts < 1) {
SPLITMUX_SRC_UNLOCK (splitmux);
goto failed_part;
}
if (splitmux->num_measured_parts >= splitmux->num_parts) {
/* Nothing needed measuring, activate the first part */
GST_INFO_OBJECT (splitmux,
"All parts measured. Total duration %" GST_TIME_FORMAT
" Activating first part", GST_TIME_ARGS (splitmux->total_duration));
gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
NULL, NULL);
}
SPLITMUX_SRC_UNLOCK (splitmux);
/* All good now: we have to wait for all parts to be asynchronously
* prepared to know the total duration we can play */
@ -1080,7 +1194,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
GST_DEBUG_OBJECT (splitmux, "Stopping");
/* Stop all part readers. */
for (i = 0; i < splitmux->num_created_parts; i++) {
for (i = 0; i < splitmux->num_parts; i++) {
if (splitmux->parts[i] == NULL)
continue;
gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
@ -1103,7 +1217,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
/* Now the pad task is stopped we can destroy the readers */
g_queue_free_full (splitmux->active_parts, g_object_unref);
for (i = 0; i < splitmux->num_created_parts; i++) {
for (i = 0; i < splitmux->num_parts; i++) {
if (splitmux->parts[i] == NULL)
continue;
g_object_unref (splitmux->parts[i]);
@ -1113,8 +1227,8 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
g_free (splitmux->parts);
splitmux->parts = NULL;
splitmux->num_parts = 0;
splitmux->num_prepared_parts = 0;
splitmux->num_created_parts = 0;
splitmux->num_measured_parts = 0;
splitmux->num_parts_alloced = 0;
splitmux->total_duration = GST_CLOCK_TIME_NONE;
/* Reset playback segment */
@ -1637,3 +1751,38 @@ splitmux_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
}
return ret;
}
static gboolean
gst_splitmuxsrc_add_fragment (GstSplitMuxSrc * splitmux,
const gchar * filename, GstClockTime offset, GstClockTime duration)
{
SPLITMUX_SRC_LOCK (splitmux);
/* Ensure we have enough space in the parts array, reallocating if necessary */
if (splitmux->num_parts == splitmux->num_parts_alloced) {
gsize to_alloc = splitmux->num_parts_alloced;
to_alloc = MAX (to_alloc + 8, 3 * to_alloc / 2);
splitmux->parts =
g_renew (GstSplitMuxPartReader *, splitmux->parts, to_alloc);
/* Zero newly allocated memory */
for (gsize i = splitmux->num_parts_alloced; i < to_alloc; i++) {
splitmux->parts[i] = NULL;
}
splitmux->num_parts_alloced = to_alloc;
}
GstSplitMuxPartReader *reader =
gst_splitmux_part_reader_create (splitmux, filename, splitmux->num_parts);
if (GST_CLOCK_TIME_IS_VALID (offset)) {
gst_splitmux_part_reader_set_start_offset (reader, offset, FIXED_TS_OFFSET);
}
if (GST_CLOCK_TIME_IS_VALID (duration)) {
gst_splitmux_part_reader_set_duration (reader, duration);
}
splitmux->parts[splitmux->num_parts] = reader;
splitmux->num_parts++;
SPLITMUX_SRC_UNLOCK (splitmux);
return TRUE;
}

View file

@ -51,8 +51,8 @@ struct _GstSplitMuxSrc
GstSplitMuxPartReader **parts;
guint num_parts;
guint num_prepared_parts;
guint num_created_parts;
guint num_parts_alloced;
guint num_measured_parts;
guint cur_part;
gboolean async_pending;

View file

@ -87,7 +87,7 @@ main (int argc, char **argv)
/* Connect to prepare signal */
g_signal_connect (data.reader, "prepared", (GCallback) part_prepared, &data);
gst_splitmux_part_reader_set_callbacks (data.reader, &data,
(GstSplitMuxPartReaderPadCb) handle_get_pad);
(GstSplitMuxPartReaderPadCb) handle_get_pad, NULL);
g_idle_add ((GSourceFunc) start_reader, &data);

View file

@ -14,6 +14,7 @@ subdir('shapewipe')
if have_v4l2
subdir('v4l2')
endif
subdir('splitmux')
if gtk_dep.found()
subdir('equalizer')

View file

@ -0,0 +1,11 @@
executable('splitmuxsrc-extract', 'splitmuxsrc-extract.c',
dependencies: [gst_dep],
c_args : gst_plugins_good_args,
include_directories : [configinc],
install: false)
executable('splitmuxsrc-add-fragment', 'splitmuxsrc-add-fragment.c',
dependencies: [gst_dep],
c_args : gst_plugins_good_args,
include_directories : [configinc],
install: false)

View file

@ -0,0 +1,139 @@
/* GStreamer
* Copyright (C) 2024 Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/*
* This example uses splitmuxsrc to play a set of splitmuxed-files,
* by reading the set of files and their playback offsets from a CSV
* file generated by splitmuxsink-fragment-info or splitmuxsrc-extract
* and providing them to splitmuxsrc via the `add-fragment` signal.
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <gst/gst.h>
GMainLoop *loop;
gchar **fragment_lines;
static gboolean
message_handler (GstBus * bus, GstMessage * message, gpointer data)
{
if (message->type == GST_MESSAGE_ERROR) {
GError *err;
gchar *debug_info;
gst_message_parse_error (message, &err, &debug_info);
g_printerr ("Error received from element %s: %s\n",
GST_OBJECT_NAME (message->src), err->message);
g_printerr ("Debugging information: %s\n",
debug_info ? debug_info : "none");
g_main_loop_quit (loop);
}
if (message->type == GST_MESSAGE_EOS) {
g_main_loop_quit (loop);
}
return TRUE;
}
static void
setup_splitmuxsrc (GstElement * playbin, GstElement * src, gpointer userdata)
{
/* Read fragment info from csv lines and add to splitmuxsrc */
gsize i = 0;
for (i = 0; fragment_lines[i] != NULL; i++) {
gchar *fragment = fragment_lines[i];
if (fragment[0] == '\0')
continue;
gchar *fname = NULL;
GstClockTime start_offset, duration;
const char *fmt = "\"%m[^\"]\",%" G_GUINT64_FORMAT ",%" G_GUINT64_FORMAT;
int ret = sscanf (fragment, fmt, &fname, &start_offset, &duration);
if (ret != 3) {
g_printerr ("failed to parse line %" G_GSIZE_FORMAT ": %s\n", i,
fragment);
g_main_loop_quit (loop);
return;
}
#if 0
g_print ("Adding fragment \"%s\",%" G_GUINT64_FORMAT ",%" G_GUINT64_FORMAT
"\n", fname, start_offset, duration);
#endif
gboolean add_result = FALSE;
g_signal_emit_by_name (G_OBJECT (src), "add-fragment", fname, start_offset,
duration, &add_result);
if (!add_result) {
g_printerr ("Failed to add fragment %" G_GSIZE_FORMAT ": %s\n", i, fname);
g_main_loop_quit (loop);
return;
}
free (fname);
}
}
int
main (int argc, char *argv[])
{
GstElement *pipe;
GstBus *bus;
gst_init (&argc, &argv);
if (argc < 2) {
g_printerr
("Usage: %s fragments.csv\n Pass a fragment info csv (from splitmuxsrc-extract) with fragment info to load\n",
argv[0]);
return 1;
}
GError *err = NULL;
gchar *fragment_info;
if (!g_file_get_contents (argv[1], &fragment_info, NULL, &err)) {
g_printerr ("Failed to open fragment info file %s. Error %s", argv[1],
err->message);
g_clear_error (&err);
return 2;
}
fragment_lines = g_strsplit (fragment_info, "\n", 0);
g_free (fragment_info);
pipe = gst_element_factory_make ("playbin3", NULL);
/* Connect to source-setup to set fragments on splitmuxsrc */
g_signal_connect (pipe, "source-setup", G_CALLBACK (setup_splitmuxsrc), NULL);
g_object_set (pipe, "uri", "splitmux://", NULL);
bus = gst_element_get_bus (pipe);
gst_bus_add_watch (bus, message_handler, NULL);
gst_object_unref (bus);
gst_element_set_state (pipe, GST_STATE_PLAYING);
loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (loop);
gst_element_set_state (pipe, GST_STATE_NULL);
gst_object_unref (pipe);
return 0;
}

View file

@ -0,0 +1,135 @@
/* GStreamer
* Copyright (C) 2024 Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/*
* This example uses splitmuxsrc to play a set of splitmuxed-files,
* listening for the fragment-info messages from splitmuxsrc
* and writing a CSV file with the fragment offsets and durations
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <gst/gst.h>
GMainLoop *loop;
FILE *out_csv = NULL;
gsize num_fragments = 0;
static gboolean
message_handler (GstBus * bus, GstMessage * message, gpointer data)
{
GstElement *pipe = data;
if (message->type == GST_MESSAGE_ELEMENT) {
const GstStructure *s = gst_message_get_structure (message);
const gchar *name = gst_structure_get_name (s);
if (strcmp (name, "splitmuxsrc-fragment-info") == 0) {
const gchar *fname = gst_structure_get_string (s, "location");
GstClockTime start_offset, duration;
if (!gst_structure_get_uint64 (s, "fragment-offset", &start_offset) ||
!gst_structure_get_uint64 (s, "fragment-duration", &duration)) {
g_assert_not_reached ();
}
fprintf (out_csv, "\"%s\",%" G_GUINT64_FORMAT ",%" G_GUINT64_FORMAT "\n",
fname, start_offset, duration);
num_fragments++;
}
} else if (message->type == GST_MESSAGE_EOS) {
g_main_loop_quit (loop);
} else if (message->type == GST_MESSAGE_STATE_CHANGED) {
GstState old_state, new_state, pending_state;
gst_message_parse_state_changed (message, &old_state, &new_state,
&pending_state);
if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipe)
&& new_state == GST_STATE_PLAYING) {
g_print ("splitmuxsrc scanned %" G_GSIZE_FORMAT " files. Exiting\n",
num_fragments);
g_main_loop_quit (loop);
}
}
return TRUE;
}
static void
on_pad_added (GstElement * src, GstPad * pad, GstBin * pipe)
{
GstElement *sink = gst_element_factory_make ("fakesink", NULL);
gst_bin_add (pipe, sink);
GstPad *sinkpad = gst_element_get_static_pad (sink, "sink");
gst_pad_link (pad, sinkpad);
gst_object_unref (sinkpad);
gst_element_sync_state_with_parent (sink);
}
int
main (int argc, char *argv[])
{
GstElement *pipe;
GstElement *src;
GstBus *bus;
gst_init (&argc, &argv);
if (argc < 3) {
g_printerr
("Usage: %s *.mp4 out.csv\n Pass splitmux file glob and fragment info will be dumped to out.csv\n",
argv[0]);
return 1;
}
out_csv = fopen (argv[2], "w");
if (out_csv == NULL) {
g_printerr ("Failed to open output file %s", argv[2]);
return 2;
}
pipe = gst_pipeline_new (NULL);
src = gst_element_factory_make ("splitmuxsrc", "src");
g_assert (src != NULL);
/* Set the files glob on src */
g_object_set (src, "location", argv[1], NULL);
/* Connect to pad-added to attach fakesink elements */
g_signal_connect (src, "pad-added", G_CALLBACK (on_pad_added), pipe);
gst_bin_add (GST_BIN (pipe), src);
bus = gst_element_get_bus (pipe);
gst_bus_add_watch (bus, message_handler, pipe);
gst_object_unref (bus);
gst_element_set_state (pipe, GST_STATE_PLAYING);
loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (loop);
fclose (out_csv);
gst_element_set_state (pipe, GST_STATE_NULL);
gst_object_unref (pipe);
return 0;
}