gstreamer/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxpartreader.c
Mathieu Duponchelle bda25f31a7 splitmuxsrc: don't consider unlinked pads when deactivating part
If splitmuxsrc exposes multiple pads, but only one is linked, part pads
will never see an EOS event. This shouldn't prevent the part from being
eventually deactivated.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3148>
2022-10-08 11:08:41 +00:00

1378 lines
40 KiB
C

/* GStreamer Split Demuxer bin that recombines files created by
* the splitmuxsink element.
*
* Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include "gstsplitmuxsrc.h"
GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
#define GST_CAT_DEFAULT splitmux_part_debug
#define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
#define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
#define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
#define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
#define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
#define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
#define SPLITMUX_PART_MSG_LOCK(p) g_mutex_lock(&(p)->msg_lock)
#define SPLITMUX_PART_MSG_UNLOCK(p) g_mutex_unlock(&(p)->msg_lock)
typedef struct _GstSplitMuxPartPad
{
GstPad parent;
/* Reader we belong to */
GstSplitMuxPartReader *reader;
/* Output splitmuxsrc source pad */
GstPad *target;
GstDataQueue *queue;
gboolean is_eos;
gboolean flushing;
gboolean seen_buffer;
gboolean is_sparse;
GstClockTime max_ts;
GstSegment segment;
GstSegment orig_segment;
GstClockTime initial_ts_offset;
} GstSplitMuxPartPad;
typedef struct _GstSplitMuxPartPadClass
{
GstPadClass parent;
} GstSplitMuxPartPadClass;
static GType gst_splitmux_part_pad_get_type (void);
#define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
#define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
static void splitmux_part_pad_constructed (GObject * pad);
static void splitmux_part_pad_finalize (GObject * pad);
static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
GstSplitMuxPartPad * part_pad, GstBuffer * buf);
static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
guint visible, guint bytes, guint64 time, gpointer checkdata);
static void type_found (GstElement * typefind, guint probability,
GstCaps * caps, GstSplitMuxPartReader * reader);
static void check_if_pads_collected (GstSplitMuxPartReader * reader);
static void
gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
reader);
/* Called with reader lock held */
static gboolean
have_empty_queue (GstSplitMuxPartReader * reader)
{
GList *cur;
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (part_pad->is_eos) {
GST_LOG_OBJECT (part_pad, "Pad is EOS");
return TRUE;
}
if (gst_data_queue_is_empty (part_pad->queue)) {
GST_LOG_OBJECT (part_pad, "Queue is empty");
return TRUE;
}
}
return FALSE;
}
/* Called with reader lock held */
static gboolean
block_until_can_push (GstSplitMuxPartReader * reader)
{
while (reader->running) {
if (reader->flushing)
goto out;
if (reader->active && have_empty_queue (reader))
goto out;
GST_LOG_OBJECT (reader,
"Waiting for activation or empty queue on reader %s", reader->path);
SPLITMUX_PART_WAIT (reader);
}
GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
reader->path, reader->active, reader->flushing);
out:
return reader->active && !reader->flushing;
}
static void
handle_buffer_measuring (GstSplitMuxPartReader * reader,
GstSplitMuxPartPad * part_pad, GstBuffer * buf)
{
GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
GstClockTimeDiff offset;
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
!part_pad->seen_buffer) {
/* If this is the first buffer on the pad in the collect_streams state,
* then calculate initial offset based on running time of this segment */
part_pad->initial_ts_offset =
part_pad->orig_segment.start + part_pad->orig_segment.base -
part_pad->orig_segment.time;
GST_DEBUG_OBJECT (reader,
"Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
}
part_pad->seen_buffer = TRUE;
/* Adjust buffer timestamps */
offset = reader->start_offset + part_pad->segment.base;
offset -= part_pad->initial_ts_offset;
/* We don't add the ts_offset here, because we
* want to measure the logical length of the stream,
* not to generate output timestamps */
/* Update the stored max duration on the pad,
* always preferring making DTS contiguous
* where possible */
if (GST_BUFFER_DTS_IS_VALID (buf))
ts = GST_BUFFER_DTS (buf) + offset;
else if (GST_BUFFER_PTS_IS_VALID (buf))
ts = GST_BUFFER_PTS (buf) + offset;
GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
" incoming DTS %" GST_TIME_FORMAT
" PTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
" to %" GST_STIME_FORMAT, part_pad,
GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
if (GST_CLOCK_STIME_IS_VALID (ts)) {
if (GST_BUFFER_DURATION_IS_VALID (buf))
ts += GST_BUFFER_DURATION (buf);
if (GST_CLOCK_STIME_IS_VALID (ts)
&& ts > (GstClockTimeDiff) part_pad->max_ts) {
part_pad->max_ts = ts;
GST_LOG_OBJECT (reader,
"pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
GST_TIME_ARGS (part_pad->max_ts));
}
}
/* Is it time to move to measuring state yet? */
check_if_pads_collected (reader);
}
static gboolean
splitmux_data_queue_is_full_cb (GstDataQueue * queue,
guint visible, guint bytes, guint64 time, gpointer checkdata)
{
/* Arbitrary safety limit. If we hit it, playback is likely to stall */
if (time > 20 * GST_SECOND)
return TRUE;
return FALSE;
}
static void
splitmux_part_free_queue_item (GstDataQueueItem * item)
{
gst_mini_object_unref (item->object);
g_slice_free (GstDataQueueItem, item);
}
static GstFlowReturn
splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
GstSplitMuxPartReader *reader = part_pad->reader;
GstDataQueueItem *item;
GstClockTimeDiff offset;
GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
handle_buffer_measuring (reader, part_pad, buf);
gst_buffer_unref (buf);
SPLITMUX_PART_UNLOCK (reader);
return GST_FLOW_OK;
}
if (!block_until_can_push (reader)) {
/* Flushing */
SPLITMUX_PART_UNLOCK (reader);
gst_buffer_unref (buf);
return GST_FLOW_FLUSHING;
}
/* Adjust buffer timestamps */
offset = reader->start_offset + part_pad->segment.base;
offset -= part_pad->initial_ts_offset;
offset += reader->ts_offset;
if (GST_BUFFER_PTS_IS_VALID (buf))
GST_BUFFER_PTS (buf) += offset;
if (GST_BUFFER_DTS_IS_VALID (buf))
GST_BUFFER_DTS (buf) += offset;
/* We are active, and one queue is empty, place this buffer in
* the dataqueue */
GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
item = g_slice_new (GstDataQueueItem);
item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
item->object = GST_MINI_OBJECT (buf);
item->size = gst_buffer_get_size (buf);
item->duration = GST_BUFFER_DURATION (buf);
if (item->duration == GST_CLOCK_TIME_NONE)
item->duration = 0;
item->visible = TRUE;
gst_object_ref (part_pad);
SPLITMUX_PART_UNLOCK (reader);
if (!gst_data_queue_push (part_pad->queue, item)) {
splitmux_part_free_queue_item (item);
gst_object_unref (part_pad);
return GST_FLOW_FLUSHING;
}
gst_object_unref (part_pad);
return GST_FLOW_OK;
}
/* Called with splitmux part lock held */
static gboolean
splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
{
GList *cur;
for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (GST_PAD_LAST_FLOW_RETURN (part_pad->target) != GST_FLOW_NOT_LINKED
&& !part_pad->is_eos)
return FALSE;
}
return TRUE;
}
static gboolean
splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
{
GList *cur;
GST_LOG_OBJECT (part, "Checking for preroll");
for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (!part_pad->seen_buffer) {
GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
part_pad);
return FALSE;
}
}
GST_LOG_OBJECT (part, "Part is prerolled");
return TRUE;
}
gboolean
gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
{
gboolean res;
SPLITMUX_PART_LOCK (reader);
res = splitmux_part_is_eos_locked (reader);
SPLITMUX_PART_UNLOCK (reader);
return res;
}
/* Called with splitmux part lock held */
static gboolean
splitmux_is_flushing (GstSplitMuxPartReader * reader)
{
GList *cur;
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (part_pad->flushing)
return TRUE;
}
return FALSE;
}
static gboolean
splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
GstSplitMuxPartReader *reader = part_pad->reader;
gboolean ret = TRUE;
SplitMuxSrcPad *target;
GstDataQueueItem *item;
SPLITMUX_PART_LOCK (reader);
target = gst_object_ref (part_pad->target);
GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
event);
if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
goto drop_event;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_STREAM_START:{
GstStreamFlags flags;
gst_event_parse_stream_flags (event, &flags);
part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
break;
}
case GST_EVENT_SEGMENT:{
GstSegment *seg = &part_pad->segment;
GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
gst_event_copy_segment (event, seg);
gst_event_copy_segment (event, &part_pad->orig_segment);
if (seg->format != GST_FORMAT_TIME)
goto wrong_segment;
/* Adjust segment */
/* Adjust start/stop so the overall file is 0 + start_offset based,
* adding a fixed offset so that DTS is never negative */
if (seg->stop != -1) {
seg->stop -= seg->start;
seg->stop += seg->time + reader->start_offset + reader->ts_offset;
}
seg->start = seg->time + reader->start_offset + reader->ts_offset;
seg->time += reader->start_offset;
seg->position += reader->start_offset;
/* Replace event */
gst_event_unref (event);
event = gst_event_new_segment (seg);
GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
&& reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
break; /* Only do further stuff with segments during initial measuring */
/* Take the first segment from the first part */
if (target->segment.format == GST_FORMAT_UNDEFINED) {
gst_segment_copy_into (seg, &target->segment);
GST_DEBUG_OBJECT (reader,
"Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
}
if (seg->stop != -1 && target->segment.stop != -1) {
GstClockTime stop = seg->base + seg->stop;
if (stop > target->segment.stop) {
target->segment.stop = stop;
GST_DEBUG_OBJECT (reader,
"Adjusting segment stop by %" GST_TIME_FORMAT
" output now %" GST_SEGMENT_FORMAT,
GST_TIME_ARGS (reader->start_offset), &target->segment);
}
}
GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
break;
}
case GST_EVENT_EOS:{
GST_DEBUG_OBJECT (part_pad,
"State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
/* Mark this pad as EOS */
part_pad->is_eos = TRUE;
if (splitmux_part_is_eos_locked (reader)) {
/* Finished measuring things, set state and tell the state change func
* so it can seek back to the start */
GST_LOG_OBJECT (reader,
"EOS while measuring streams. Resetting for ready");
reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc)
gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
}
goto drop_event;
}
break;
}
case GST_EVENT_FLUSH_START:
reader->flushing = TRUE;
part_pad->flushing = TRUE;
GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
part_pad);
gst_data_queue_set_flushing (part_pad->queue, TRUE);
SPLITMUX_PART_BROADCAST (reader);
break;
case GST_EVENT_FLUSH_STOP:{
gst_data_queue_set_flushing (part_pad->queue, FALSE);
gst_data_queue_flush (part_pad->queue);
part_pad->seen_buffer = FALSE;
part_pad->flushing = FALSE;
part_pad->is_eos = FALSE;
reader->flushing = splitmux_is_flushing (reader);
GST_LOG_OBJECT (reader,
"%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
reader->path, pad, reader->flushing);
SPLITMUX_PART_BROADCAST (reader);
break;
}
default:
break;
}
/* Don't send events downstream while preparing */
if (reader->prep_state != PART_STATE_READY)
goto drop_event;
/* Don't pass flush events - those are done by the parent */
if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
goto drop_event;
if (!block_until_can_push (reader))
goto drop_event;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_GAP:{
/* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
goto drop_event;
}
default:
break;
}
/* We are active, and one queue is empty, place this buffer in
* the dataqueue */
gst_object_ref (part_pad->queue);
SPLITMUX_PART_UNLOCK (reader);
GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
item = g_slice_new (GstDataQueueItem);
item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
item->object = GST_MINI_OBJECT (event);
item->size = 0;
item->duration = 0;
if (item->duration == GST_CLOCK_TIME_NONE)
item->duration = 0;
item->visible = FALSE;
if (!gst_data_queue_push (part_pad->queue, item)) {
splitmux_part_free_queue_item (item);
ret = FALSE;
}
gst_object_unref (part_pad->queue);
gst_object_unref (target);
return ret;
wrong_segment:
gst_event_unref (event);
gst_object_unref (target);
SPLITMUX_PART_UNLOCK (reader);
GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
reader->path, pad));
return FALSE;
drop_event:
GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
" from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
gst_event_unref (event);
gst_object_unref (target);
SPLITMUX_PART_UNLOCK (reader);
return TRUE;
}
static gboolean
splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
GstSplitMuxPartReader *reader = part_pad->reader;
GstPad *target;
gboolean ret = FALSE;
gboolean active;
SPLITMUX_PART_LOCK (reader);
target = gst_object_ref (part_pad->target);
active = reader->active;
SPLITMUX_PART_UNLOCK (reader);
if (active) {
GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
" from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
ret = gst_pad_query (target, query);
}
gst_object_unref (target);
return ret;
}
G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
static void
splitmux_part_pad_constructed (GObject * pad)
{
gst_pad_set_chain_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
gst_pad_set_event_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
gst_pad_set_query_function (GST_PAD (pad),
GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
}
static void
gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
{
GObjectClass *gobject_klass = (GObjectClass *) (klass);
gobject_klass->constructed = splitmux_part_pad_constructed;
gobject_klass->finalize = splitmux_part_pad_finalize;
}
static void
gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
{
pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
NULL, NULL, pad);
gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
}
static void
splitmux_part_pad_finalize (GObject * obj)
{
GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
GST_DEBUG_OBJECT (obj, "finalize");
gst_data_queue_set_flushing (pad->queue, TRUE);
gst_data_queue_flush (pad->queue);
gst_object_unref (GST_OBJECT_CAST (pad->queue));
pad->queue = NULL;
G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
}
static void
new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
GstSplitMuxPartReader * part);
static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
static GstStateChangeReturn
gst_splitmux_part_reader_change_state (GstElement * element,
GstStateChange transition);
static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
GstEvent * event);
static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
* part, gboolean flushing);
static void bus_handler (GstBin * bin, GstMessage * msg);
static void splitmux_part_reader_dispose (GObject * object);
static void splitmux_part_reader_finalize (GObject * object);
static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
#define gst_splitmux_part_reader_parent_class parent_class
G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
GST_TYPE_PIPELINE);
static void
gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
{
GObjectClass *gobject_klass = (GObjectClass *) (klass);
GstElementClass *gstelement_class = (GstElementClass *) klass;
GstBinClass *gstbin_class = (GstBinClass *) klass;
GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
"Split File Demuxing Source helper");
gobject_klass->dispose = splitmux_part_reader_dispose;
gobject_klass->finalize = splitmux_part_reader_finalize;
gstelement_class->change_state = gst_splitmux_part_reader_change_state;
gstelement_class->send_event = gst_splitmux_part_reader_send_event;
gstbin_class->handle_message = bus_handler;
}
static void
gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
{
GstElement *typefind;
reader->active = FALSE;
reader->duration = GST_CLOCK_TIME_NONE;
g_cond_init (&reader->inactive_cond);
g_mutex_init (&reader->lock);
g_mutex_init (&reader->type_lock);
g_mutex_init (&reader->msg_lock);
/* FIXME: Create elements on a state change */
reader->src = gst_element_factory_make ("filesrc", NULL);
if (reader->src == NULL) {
GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
return;
}
gst_bin_add (GST_BIN_CAST (reader), reader->src);
typefind = gst_element_factory_make ("typefind", NULL);
if (!typefind) {
GST_ERROR_OBJECT (reader,
"Failed to create typefind element - check your installation");
return;
}
gst_bin_add (GST_BIN_CAST (reader), typefind);
reader->typefind = typefind;
if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
GST_ERROR_OBJECT (reader,
"Failed to link typefind element - check your installation");
return;
}
g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
reader);
}
static void
splitmux_part_reader_dispose (GObject * object)
{
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
splitmux_part_reader_reset (reader);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
splitmux_part_reader_finalize (GObject * object)
{
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
g_cond_clear (&reader->inactive_cond);
g_mutex_clear (&reader->lock);
g_mutex_clear (&reader->type_lock);
g_mutex_clear (&reader->msg_lock);
g_free (reader->path);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
do_async_start (GstSplitMuxPartReader * reader)
{
GstMessage *message;
SPLITMUX_PART_MSG_LOCK (reader);
reader->async_pending = TRUE;
message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
SPLITMUX_PART_MSG_UNLOCK (reader);
}
static void
do_async_done (GstSplitMuxPartReader * reader)
{
GstMessage *message;
SPLITMUX_PART_MSG_LOCK (reader);
if (reader->async_pending) {
message =
gst_message_new_async_done (GST_OBJECT_CAST (reader),
GST_CLOCK_TIME_NONE);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
message);
reader->async_pending = FALSE;
}
SPLITMUX_PART_MSG_UNLOCK (reader);
}
static void
splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
{
GList *cur;
SPLITMUX_PART_LOCK (reader);
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstPad *pad = GST_PAD_CAST (cur->data);
gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
gst_object_unref (GST_OBJECT_CAST (pad));
}
g_list_free (reader->pads);
reader->pads = NULL;
SPLITMUX_PART_UNLOCK (reader);
}
static GstSplitMuxPartPad *
gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
GstPad * target)
{
GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
"name", GST_PAD_NAME (target),
"direction", GST_PAD_SINK,
NULL);
pad->target = target;
pad->reader = reader;
gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
return pad;
}
static void
new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
GstSplitMuxPartReader * reader)
{
GstPad *out_pad = NULL;
GstSplitMuxPartPad *proxy_pad;
GstCaps *caps;
GstPadLinkReturn link_ret;
caps = gst_pad_get_current_caps (pad);
GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
" caps %" GST_PTR_FORMAT, reader->path, pad, caps);
gst_caps_unref (caps);
/* Look up or create the output pad */
if (reader->get_pad_cb)
out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
if (out_pad == NULL) {
GST_DEBUG_OBJECT (reader,
"No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
return;
}
/* Create our proxy pad to interact with this new pad */
proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
GST_DEBUG_OBJECT (reader,
"created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
proxy_pad, out_pad);
link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
if (link_ret != GST_PAD_LINK_OK) {
gst_object_unref (proxy_pad);
GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
" ret %d", reader->path, pad, link_ret));
return;
}
GST_DEBUG_OBJECT (reader,
"new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
pad, proxy_pad);
SPLITMUX_PART_LOCK (reader);
reader->pads = g_list_prepend (reader->pads, proxy_pad);
SPLITMUX_PART_UNLOCK (reader);
}
static gboolean
gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
{
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
gboolean ret = FALSE;
GstPad *pad = NULL;
/* Send event to the first source pad we found */
SPLITMUX_PART_LOCK (reader);
if (reader->pads) {
GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
pad = gst_pad_get_peer (proxy_pad);
}
SPLITMUX_PART_UNLOCK (reader);
if (pad) {
ret = gst_pad_send_event (pad, event);
gst_object_unref (pad);
} else {
gst_event_unref (event);
}
return ret;
}
/* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
static void
gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
GstClockTime time)
{
SPLITMUX_PART_UNLOCK (reader);
GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
GST_TIME_ARGS (time));
gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
GST_SEEK_TYPE_END, 0);
SPLITMUX_PART_LOCK (reader);
/* Wait for flush to finish, so old data is gone */
while (reader->flushing) {
GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
SPLITMUX_PART_WAIT (reader);
}
}
/* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
static gboolean
gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
GstSegment * target_seg, GstSeekFlags extra_flags)
{
GstSeekFlags flags;
GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
SPLITMUX_PART_LOCK (reader);
if (target_seg->start >= reader->start_offset)
start = target_seg->start - reader->start_offset;
/* If the segment stop is within this part, don't play to the end */
if (target_seg->stop != -1 &&
target_seg->stop < reader->start_offset + reader->duration)
stop = target_seg->stop - reader->start_offset;
SPLITMUX_PART_UNLOCK (reader);
GST_DEBUG_OBJECT (reader,
"Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
stop);
}
/* Called with lock held */
static void
gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
{
SPLITMUX_PART_LOCK (reader);
/* Trigger a flushing seek to near the end of the file and run each stream
* to EOS in order to find the smallest end timestamp to start the next
* file from
*/
if (GST_CLOCK_TIME_IS_VALID (reader->duration)
&& reader->duration > GST_SECOND) {
GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
}
SPLITMUX_PART_UNLOCK (reader);
}
static void
gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
reader)
{
SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
/* Fire the prepared signal and go to READY state */
GST_DEBUG_OBJECT (reader,
"Stream measuring complete. File %s is now ready", reader->path);
reader->prep_state = PART_STATE_READY;
SPLITMUX_PART_UNLOCK (reader);
do_async_done (reader);
} else {
SPLITMUX_PART_UNLOCK (reader);
}
}
static GstElement *
find_demuxer (GstCaps * caps)
{
GList *factories =
gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
GST_RANK_MARGINAL);
GList *compat_elements;
GstElement *e = NULL;
if (factories == NULL)
return NULL;
compat_elements =
gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
if (compat_elements) {
/* Just take the first (highest ranked) option */
GstElementFactory *factory =
GST_ELEMENT_FACTORY_CAST (compat_elements->data);
e = gst_element_factory_create (factory, NULL);
gst_plugin_feature_list_free (compat_elements);
}
if (factories)
gst_plugin_feature_list_free (factories);
return e;
}
static void
type_found (GstElement * typefind, guint probability,
GstCaps * caps, GstSplitMuxPartReader * reader)
{
GstElement *demux;
GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
/* typefind found a type. Look for the demuxer to handle it */
demux = reader->demux = find_demuxer (caps);
if (reader->demux == NULL) {
GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
return;
}
/* Connect to demux signals */
g_signal_connect (demux,
"pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
gst_element_set_locked_state (demux, TRUE);
gst_bin_add (GST_BIN_CAST (reader), demux);
gst_element_link_pads (reader->typefind, "src", demux, NULL);
gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
gst_element_set_locked_state (demux, FALSE);
}
static void
check_if_pads_collected (GstSplitMuxPartReader * reader)
{
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
/* Check we have all pads and each pad has seen a buffer */
if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
GST_DEBUG_OBJECT (reader,
"no more pads - file %s. Measuring stream length", reader->path);
reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
NULL, NULL);
}
}
}
static void
no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
{
GstClockTime duration = GST_CLOCK_TIME_NONE;
GList *cur;
/* Query the minimum duration of any pad in this piece and store it.
* FIXME: Only consider audio and video */
SPLITMUX_PART_LOCK (reader);
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstPad *target = GST_PAD_CAST (cur->data);
if (target) {
gint64 cur_duration;
if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
GST_INFO_OBJECT (reader,
"file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
reader->path, target, GST_TIME_ARGS (cur_duration));
if (cur_duration < duration)
duration = cur_duration;
}
}
}
GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
reader->path, GST_TIME_ARGS (duration));
reader->duration = (GstClockTime) duration;
reader->no_more_pads = TRUE;
check_if_pads_collected (reader);
SPLITMUX_PART_UNLOCK (reader);
}
gboolean
gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
GstPad * src_pad, GstQuery * query)
{
GstPad *target = NULL;
gboolean ret;
GList *cur;
SPLITMUX_PART_LOCK (part);
/* Find the pad corresponding to the visible output target pad */
for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (part_pad->target == src_pad) {
target = gst_object_ref (GST_OBJECT_CAST (part_pad));
break;
}
}
SPLITMUX_PART_UNLOCK (part);
if (target == NULL)
return FALSE;
ret = gst_pad_peer_query (target, query);
if (ret == FALSE)
goto out;
/* Post-massaging of queries */
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:{
GstFormat fmt;
gint64 position;
gst_query_parse_position (query, &fmt, &position);
if (fmt != GST_FORMAT_TIME)
return FALSE;
SPLITMUX_PART_LOCK (part);
position += part->start_offset;
GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
GST_TIME_ARGS (position));
SPLITMUX_PART_UNLOCK (part);
gst_query_set_position (query, fmt, position);
break;
}
default:
break;
}
out:
gst_object_unref (target);
return ret;
}
static GstStateChangeReturn
gst_splitmux_part_reader_change_state (GstElement * element,
GstStateChange transition)
{
GstStateChangeReturn ret;
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:{
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
SPLITMUX_PART_LOCK (reader);
g_object_set (reader->src, "location", reader->path, NULL);
reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
reader->running = TRUE;
SPLITMUX_PART_UNLOCK (reader);
/* we go to PAUSED asynchronously once all streams have been collected
* and seeks to measure the stream lengths are done */
do_async_start (reader);
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:
case GST_STATE_CHANGE_PAUSED_TO_READY:
SPLITMUX_PART_LOCK (reader);
gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
reader->running = FALSE;
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
SPLITMUX_PART_LOCK (reader);
reader->active = FALSE;
gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE) {
do_async_done (reader);
goto beach;
}
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
ret = GST_STATE_CHANGE_ASYNC;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
do_async_done (reader);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
SPLITMUX_PART_LOCK (reader);
gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
reader->active = TRUE;
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
reader->prep_state = PART_STATE_NULL;
splitmux_part_reader_reset (reader);
break;
default:
break;
}
beach:
return ret;
}
gboolean
gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
{
GstStateChangeReturn ret;
ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
if (ret == GST_STATE_CHANGE_FAILURE)
return FALSE;
return TRUE;
}
void
gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
{
gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
}
void
gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
const gchar * path)
{
reader->path = g_strdup (path);
}
gboolean
gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
GstSegment * seg, GstSeekFlags extra_flags)
{
GST_DEBUG_OBJECT (reader, "Activating part reader");
if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
seg);
return FALSE;
}
if (gst_element_set_state (GST_ELEMENT_CAST (reader),
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
return FALSE;
}
return TRUE;
}
gboolean
gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
{
gboolean ret;
SPLITMUX_PART_LOCK (part);
ret = part->active;
SPLITMUX_PART_UNLOCK (part);
return ret;
}
void
gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
{
GST_DEBUG_OBJECT (reader, "Deactivating reader");
gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
}
void
gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
gboolean flushing)
{
GList *cur;
GST_LOG_OBJECT (reader, "%s dataqueues",
flushing ? "Flushing" : "Done flushing");
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
gst_data_queue_set_flushing (part_pad->queue, flushing);
if (flushing)
gst_data_queue_flush (part_pad->queue);
}
};
void
gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
{
reader->cb_data = cb_data;
reader->get_pad_cb = get_pad_cb;
}
GstClockTime
gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
{
GList *cur;
GstClockTime ret = GST_CLOCK_TIME_NONE;
SPLITMUX_PART_LOCK (reader);
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (!part_pad->is_sparse && part_pad->max_ts < ret)
ret = part_pad->max_ts;
}
SPLITMUX_PART_UNLOCK (reader);
return ret;
}
void
gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
GstClockTime time_offset, GstClockTime ts_offset)
{
SPLITMUX_PART_LOCK (reader);
reader->start_offset = time_offset;
reader->ts_offset = ts_offset;
GST_INFO_OBJECT (reader, "Time offset now %" GST_TIME_FORMAT,
GST_TIME_ARGS (time_offset));
SPLITMUX_PART_UNLOCK (reader);
}
GstClockTime
gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
{
GstClockTime ret = GST_CLOCK_TIME_NONE;
SPLITMUX_PART_LOCK (reader);
ret = reader->start_offset;
SPLITMUX_PART_UNLOCK (reader);
return ret;
}
GstClockTime
gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
{
GstClockTime dur;
SPLITMUX_PART_LOCK (reader);
dur = reader->duration;
SPLITMUX_PART_UNLOCK (reader);
return dur;
}
GstPad *
gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
GstPad * target)
{
GstPad *result = NULL;
GList *cur;
SPLITMUX_PART_LOCK (reader);
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
if (part_pad->target == target) {
result = (GstPad *) gst_object_ref (part_pad);
break;
}
}
SPLITMUX_PART_UNLOCK (reader);
return result;
}
GstFlowReturn
gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
GstDataQueueItem ** item)
{
GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
GstDataQueue *q;
GstFlowReturn ret;
/* Get one item from the appropriate dataqueue */
SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_FAILED) {
SPLITMUX_PART_UNLOCK (reader);
return GST_FLOW_ERROR;
}
q = gst_object_ref (part_pad->queue);
/* Have to drop the lock around pop, so we can be woken up for flush */
SPLITMUX_PART_UNLOCK (reader);
if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
ret = GST_FLOW_FLUSHING;
goto out;
}
SPLITMUX_PART_LOCK (reader);
SPLITMUX_PART_BROADCAST (reader);
if (GST_IS_EVENT ((*item)->object)) {
GstEvent *e = (GstEvent *) ((*item)->object);
/* Mark this pad as EOS */
if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
part_pad->is_eos = TRUE;
}
SPLITMUX_PART_UNLOCK (reader);
ret = GST_FLOW_OK;
out:
gst_object_unref (q);
return ret;
}
static void
bus_handler (GstBin * bin, GstMessage * message)
{
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:
/* Make sure to set the state to failed and wake up the listener
* on error */
SPLITMUX_PART_LOCK (reader);
GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
" marking this reader as failed", GST_MESSAGE_SRC (message));
reader->prep_state = PART_STATE_FAILED;
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
do_async_done (reader);
break;
default:
break;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}