mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-10 11:29:55 +00:00
e7ec32801a
Don't hold the main splitmux part lock over the parent state change function, as it prevents posting error messages that happen. Since the purpose is to prevent typefinding from proceeding, use a separate mutex just for that.
1349 lines
40 KiB
C
1349 lines
40 KiB
C
/* GStreamer Split Demuxer bin that recombines files created by
|
|
* the splitmuxsink element.
|
|
*
|
|
* Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include <string.h>
|
|
#include "gstsplitmuxsrc.h"
|
|
|
|
GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
|
|
#define GST_CAT_DEFAULT splitmux_part_debug
|
|
|
|
#define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
|
|
#define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
|
|
#define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
|
|
#define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
|
|
|
|
#define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
|
|
#define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
|
|
|
|
enum
|
|
{
|
|
SIGNAL_PREPARED,
|
|
LAST_SIGNAL
|
|
};
|
|
|
|
static guint part_reader_signals[LAST_SIGNAL] = { 0 };
|
|
|
|
typedef struct _GstSplitMuxPartPad
|
|
{
|
|
GstPad parent;
|
|
|
|
/* Reader we belong to */
|
|
GstSplitMuxPartReader *reader;
|
|
/* Output splitmuxsrc source pad */
|
|
GstPad *target;
|
|
|
|
GstDataQueue *queue;
|
|
|
|
gboolean is_eos;
|
|
gboolean flushing;
|
|
gboolean seen_buffer;
|
|
|
|
GstClockTime max_ts;
|
|
GstSegment segment;
|
|
|
|
GstSegment orig_segment;
|
|
GstClockTime initial_ts_offset;
|
|
} GstSplitMuxPartPad;
|
|
|
|
typedef struct _GstSplitMuxPartPadClass
|
|
{
|
|
GstPadClass parent;
|
|
} GstSplitMuxPartPadClass;
|
|
|
|
static GType gst_splitmux_part_pad_get_type (void);
|
|
#define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
|
|
#define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
|
|
|
|
static void splitmux_part_pad_constructed (GObject * pad);
|
|
static void splitmux_part_pad_finalize (GObject * pad);
|
|
static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
|
|
GstSplitMuxPartPad * part_pad, GstBuffer * buf);
|
|
|
|
static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
|
|
guint visible, guint bytes, guint64 time, gpointer checkdata);
|
|
static void type_found (GstElement * typefind, guint probability,
|
|
GstCaps * caps, GstSplitMuxPartReader * reader);
|
|
static void check_if_pads_collected (GstSplitMuxPartReader * reader);
|
|
|
|
/* Called with reader lock held */
|
|
static gboolean
|
|
have_empty_queue (GstSplitMuxPartReader * reader)
|
|
{
|
|
GList *cur;
|
|
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (part_pad->is_eos) {
|
|
GST_LOG_OBJECT (part_pad, "Pad is EOS");
|
|
return TRUE;
|
|
}
|
|
if (gst_data_queue_is_empty (part_pad->queue)) {
|
|
GST_LOG_OBJECT (part_pad, "Queue is empty");
|
|
return TRUE;
|
|
}
|
|
}
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
/* Called with reader lock held */
|
|
static gboolean
|
|
block_until_can_push (GstSplitMuxPartReader * reader)
|
|
{
|
|
while (reader->running) {
|
|
if (reader->flushing)
|
|
goto out;
|
|
if (reader->active && have_empty_queue (reader))
|
|
goto out;
|
|
|
|
GST_LOG_OBJECT (reader,
|
|
"Waiting for activation or empty queue on reader %s", reader->path);
|
|
SPLITMUX_PART_WAIT (reader);
|
|
}
|
|
|
|
GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
|
|
reader->path, reader->active, reader->flushing);
|
|
out:
|
|
return reader->active && !reader->flushing;
|
|
}
|
|
|
|
static void
|
|
handle_buffer_measuring (GstSplitMuxPartReader * reader,
|
|
GstSplitMuxPartPad * part_pad, GstBuffer * buf)
|
|
{
|
|
GstClockTime ts = GST_CLOCK_TIME_NONE;
|
|
GstClockTimeDiff offset;
|
|
|
|
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
|
|
!part_pad->seen_buffer) {
|
|
/* If this is the first buffer on the pad in the collect_streams state,
|
|
* then calculate inital offset based on running time of this segment */
|
|
part_pad->initial_ts_offset =
|
|
part_pad->orig_segment.start + part_pad->orig_segment.base -
|
|
part_pad->orig_segment.time;
|
|
GST_DEBUG_OBJECT (reader,
|
|
"Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
|
|
part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
|
|
}
|
|
part_pad->seen_buffer = TRUE;
|
|
|
|
/* Adjust buffer timestamps */
|
|
offset = reader->start_offset + part_pad->segment.base;
|
|
offset -= part_pad->initial_ts_offset;
|
|
|
|
/* Update the stored max duration on the pad,
|
|
* always preferring making DTS contiguous
|
|
* where possible */
|
|
if (GST_BUFFER_DTS_IS_VALID (buf))
|
|
ts = GST_BUFFER_DTS (buf) + offset;
|
|
else if (GST_BUFFER_PTS_IS_VALID (buf))
|
|
ts = GST_BUFFER_PTS (buf) + offset;
|
|
|
|
GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
|
|
" incoming PTS %" GST_TIME_FORMAT
|
|
" DTS %" GST_TIME_FORMAT " offset by %" GST_TIME_FORMAT
|
|
" to %" GST_TIME_FORMAT, part_pad,
|
|
GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
|
|
GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
|
|
GST_TIME_ARGS (offset), GST_TIME_ARGS (ts));
|
|
|
|
if (GST_CLOCK_TIME_IS_VALID (ts)) {
|
|
if (GST_BUFFER_DURATION_IS_VALID (buf))
|
|
ts += GST_BUFFER_DURATION (buf);
|
|
|
|
if (GST_CLOCK_TIME_IS_VALID (ts) && ts > part_pad->max_ts) {
|
|
part_pad->max_ts = ts;
|
|
GST_LOG_OBJECT (reader,
|
|
"pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
|
|
GST_TIME_ARGS (part_pad->max_ts));
|
|
}
|
|
}
|
|
/* Is it time to move to measuring state yet? */
|
|
check_if_pads_collected (reader);
|
|
}
|
|
|
|
static gboolean
|
|
splitmux_data_queue_is_full_cb (GstDataQueue * queue,
|
|
guint visible, guint bytes, guint64 time, gpointer checkdata)
|
|
{
|
|
/* Arbitrary safety limit. If we hit it, playback is likely to stall */
|
|
if (time > 20 * GST_SECOND)
|
|
return TRUE;
|
|
return FALSE;
|
|
}
|
|
|
|
static void
|
|
splitmux_part_free_queue_item (GstDataQueueItem * item)
|
|
{
|
|
gst_mini_object_unref (item->object);
|
|
g_slice_free (GstDataQueueItem, item);
|
|
}
|
|
|
|
static GstFlowReturn
|
|
splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
|
|
{
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
|
|
GstSplitMuxPartReader *reader = part_pad->reader;
|
|
GstDataQueueItem *item;
|
|
GstClockTimeDiff offset;
|
|
|
|
GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
|
|
SPLITMUX_PART_LOCK (reader);
|
|
|
|
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
|
|
reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
|
|
handle_buffer_measuring (reader, part_pad, buf);
|
|
gst_buffer_unref (buf);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
if (!block_until_can_push (reader)) {
|
|
/* Flushing */
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
gst_buffer_unref (buf);
|
|
return GST_FLOW_FLUSHING;
|
|
}
|
|
|
|
/* Adjust buffer timestamps */
|
|
offset = reader->start_offset + part_pad->segment.base;
|
|
offset -= part_pad->initial_ts_offset;
|
|
|
|
if (GST_BUFFER_PTS_IS_VALID (buf))
|
|
GST_BUFFER_PTS (buf) += offset;
|
|
if (GST_BUFFER_DTS_IS_VALID (buf))
|
|
GST_BUFFER_DTS (buf) += offset;
|
|
|
|
/* We are active, and one queue is empty, place this buffer in
|
|
* the dataqueue */
|
|
GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
|
|
item = g_slice_new (GstDataQueueItem);
|
|
item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
|
|
item->object = GST_MINI_OBJECT (buf);
|
|
item->size = gst_buffer_get_size (buf);
|
|
item->duration = GST_BUFFER_DURATION (buf);
|
|
if (item->duration == GST_CLOCK_TIME_NONE)
|
|
item->duration = 0;
|
|
item->visible = TRUE;
|
|
|
|
gst_object_ref (part_pad);
|
|
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
if (!gst_data_queue_push (part_pad->queue, item)) {
|
|
splitmux_part_free_queue_item (item);
|
|
gst_object_unref (part_pad);
|
|
return GST_FLOW_FLUSHING;
|
|
}
|
|
|
|
gst_object_unref (part_pad);
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
/* Called with splitmux part lock held */
|
|
static gboolean
|
|
splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
|
|
{
|
|
GList *cur;
|
|
for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (!part_pad->is_eos)
|
|
return FALSE;
|
|
}
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
|
|
{
|
|
GList *cur;
|
|
GST_LOG_OBJECT (part, "Checking for preroll");
|
|
for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (!part_pad->seen_buffer) {
|
|
GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
|
|
part_pad);
|
|
return FALSE;
|
|
}
|
|
}
|
|
GST_LOG_OBJECT (part, "Part is prerolled");
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
gboolean
|
|
gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
|
|
{
|
|
gboolean res;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
res = splitmux_part_is_eos_locked (reader);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
return res;
|
|
}
|
|
|
|
/* Called with splitmux part lock held */
|
|
static gboolean
|
|
splitmux_is_flushing (GstSplitMuxPartReader * reader)
|
|
{
|
|
GList *cur;
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (part_pad->flushing)
|
|
return TRUE;
|
|
}
|
|
|
|
return FALSE;
|
|
}
|
|
|
|
static gboolean
|
|
splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
|
|
{
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
|
|
GstSplitMuxPartReader *reader = part_pad->reader;
|
|
gboolean ret = TRUE;
|
|
SplitMuxSrcPad *target;
|
|
GstDataQueueItem *item;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
|
|
target = gst_object_ref (part_pad->target);
|
|
|
|
GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
|
|
event);
|
|
|
|
if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
|
|
goto drop_event;
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_SEGMENT:{
|
|
GstSegment *seg = &part_pad->segment;
|
|
|
|
GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
|
|
|
|
gst_event_copy_segment (event, seg);
|
|
gst_event_copy_segment (event, &part_pad->orig_segment);
|
|
|
|
if (seg->format != GST_FORMAT_TIME)
|
|
goto wrong_segment;
|
|
|
|
/* Adjust segment */
|
|
/* Adjust start/stop so the overall file is 0 + start_offset based */
|
|
if (seg->stop != -1) {
|
|
seg->stop -= seg->start;
|
|
seg->stop += seg->time + reader->start_offset;
|
|
}
|
|
seg->start = seg->time + reader->start_offset;
|
|
seg->time += reader->start_offset;
|
|
seg->position += reader->start_offset;
|
|
|
|
GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
|
|
|
|
/* Replace event */
|
|
gst_event_unref (event);
|
|
event = gst_event_new_segment (seg);
|
|
|
|
if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
|
|
&& reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
|
|
break; /* Only do further stuff with segments during initial measuring */
|
|
|
|
/* Take the first segment from the first part */
|
|
if (target->segment.format == GST_FORMAT_UNDEFINED) {
|
|
gst_segment_copy_into (seg, &target->segment);
|
|
GST_DEBUG_OBJECT (reader,
|
|
"Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
|
|
}
|
|
|
|
if (seg->stop != -1 && target->segment.stop != -1) {
|
|
GstClockTime stop = seg->base + seg->stop;
|
|
if (stop > target->segment.stop) {
|
|
target->segment.stop = stop;
|
|
GST_DEBUG_OBJECT (reader,
|
|
"Adjusting segment stop by %" GST_TIME_FORMAT
|
|
" output now %" GST_SEGMENT_FORMAT,
|
|
GST_TIME_ARGS (reader->start_offset), &target->segment);
|
|
}
|
|
}
|
|
GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
|
|
break;
|
|
}
|
|
case GST_EVENT_EOS:{
|
|
|
|
GST_DEBUG_OBJECT (part_pad,
|
|
"State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
|
|
reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
|
|
|
|
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
|
|
reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
|
|
/* Mark this pad as EOS */
|
|
part_pad->is_eos = TRUE;
|
|
if (splitmux_part_is_eos_locked (reader)) {
|
|
/* Finished measuring things, set state and tell the state change func
|
|
* so it can seek back to the start */
|
|
GST_LOG_OBJECT (reader,
|
|
"EOS while measuring streams. Resetting for ready");
|
|
reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
}
|
|
goto drop_event;
|
|
}
|
|
break;
|
|
}
|
|
case GST_EVENT_FLUSH_START:
|
|
reader->flushing = TRUE;
|
|
part_pad->flushing = TRUE;
|
|
GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
|
|
part_pad);
|
|
gst_data_queue_set_flushing (part_pad->queue, TRUE);
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
break;
|
|
case GST_EVENT_FLUSH_STOP:{
|
|
gst_data_queue_set_flushing (part_pad->queue, FALSE);
|
|
gst_data_queue_flush (part_pad->queue);
|
|
part_pad->seen_buffer = FALSE;
|
|
part_pad->flushing = FALSE;
|
|
part_pad->is_eos = FALSE;
|
|
|
|
reader->flushing = splitmux_is_flushing (reader);
|
|
GST_LOG_OBJECT (reader,
|
|
"%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
|
|
reader->path, pad, reader->flushing);
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
/* Don't send events downstream while preparing */
|
|
if (reader->prep_state != PART_STATE_READY)
|
|
goto drop_event;
|
|
|
|
/* Don't pass flush events - those are done by the parent */
|
|
if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
|
|
GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
|
|
goto drop_event;
|
|
|
|
if (!block_until_can_push (reader)) {
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
gst_object_unref (target);
|
|
gst_event_unref (event);
|
|
return FALSE;
|
|
}
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_GAP:{
|
|
/* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
|
|
goto drop_event;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
/* We are active, and one queue is empty, place this buffer in
|
|
* the dataqueue */
|
|
gst_object_ref (part_pad->queue);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
|
|
item = g_slice_new (GstDataQueueItem);
|
|
item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
|
|
item->object = GST_MINI_OBJECT (event);
|
|
item->size = 0;
|
|
item->duration = 0;
|
|
if (item->duration == GST_CLOCK_TIME_NONE)
|
|
item->duration = 0;
|
|
item->visible = FALSE;
|
|
|
|
if (!gst_data_queue_push (part_pad->queue, item)) {
|
|
splitmux_part_free_queue_item (item);
|
|
ret = FALSE;
|
|
}
|
|
|
|
gst_object_unref (part_pad->queue);
|
|
gst_object_unref (target);
|
|
|
|
return ret;
|
|
wrong_segment:
|
|
gst_event_unref (event);
|
|
gst_object_unref (target);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
|
|
("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
|
|
reader->path, pad));
|
|
return FALSE;
|
|
drop_event:
|
|
GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
|
|
" from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
|
|
gst_event_unref (event);
|
|
gst_object_unref (target);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
|
|
{
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
|
|
GstSplitMuxPartReader *reader = part_pad->reader;
|
|
GstPad *target;
|
|
gboolean ret = FALSE;
|
|
gboolean active;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
target = gst_object_ref (part_pad->target);
|
|
active = reader->active;
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
if (active) {
|
|
GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
|
|
" from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
|
|
|
|
ret = gst_pad_query (target, query);
|
|
}
|
|
|
|
gst_object_unref (target);
|
|
|
|
return ret;
|
|
}
|
|
|
|
G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
|
|
|
|
static void
|
|
splitmux_part_pad_constructed (GObject * pad)
|
|
{
|
|
gst_pad_set_chain_function (GST_PAD (pad),
|
|
GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
|
|
gst_pad_set_event_function (GST_PAD (pad),
|
|
GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
|
|
gst_pad_set_query_function (GST_PAD (pad),
|
|
GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
|
|
|
|
G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
|
|
{
|
|
GObjectClass *gobject_klass = (GObjectClass *) (klass);
|
|
|
|
gobject_klass->constructed = splitmux_part_pad_constructed;
|
|
gobject_klass->finalize = splitmux_part_pad_finalize;
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
|
|
{
|
|
pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
|
|
NULL, NULL, pad);
|
|
gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
|
|
gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
|
|
}
|
|
|
|
static void
|
|
splitmux_part_pad_finalize (GObject * obj)
|
|
{
|
|
GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
|
|
|
|
GST_DEBUG_OBJECT (obj, "finalize");
|
|
gst_data_queue_set_flushing (pad->queue, TRUE);
|
|
gst_data_queue_flush (pad->queue);
|
|
gst_object_unref (GST_OBJECT_CAST (pad->queue));
|
|
pad->queue = NULL;
|
|
|
|
G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
|
|
}
|
|
|
|
static void
|
|
new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
|
|
GstSplitMuxPartReader * part);
|
|
static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
|
|
static GstStateChangeReturn
|
|
gst_splitmux_part_reader_change_state (GstElement * element,
|
|
GstStateChange transition);
|
|
static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
|
|
GstEvent * event);
|
|
static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
|
|
* part, gboolean flushing);
|
|
static void bus_handler (GstBin * bin, GstMessage * msg);
|
|
static void splitmux_part_reader_dispose (GObject * object);
|
|
static void splitmux_part_reader_finalize (GObject * object);
|
|
static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
|
|
|
|
#define gst_splitmux_part_reader_parent_class parent_class
|
|
G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
|
|
GST_TYPE_PIPELINE);
|
|
|
|
static void
|
|
gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
|
|
{
|
|
GObjectClass *gobject_klass = (GObjectClass *) (klass);
|
|
GstElementClass *gstelement_class = (GstElementClass *) klass;
|
|
GstBinClass *gstbin_class = (GstBinClass *) klass;
|
|
|
|
GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
|
|
"Split File Demuxing Source helper");
|
|
|
|
gobject_klass->dispose = splitmux_part_reader_dispose;
|
|
gobject_klass->finalize = splitmux_part_reader_finalize;
|
|
|
|
part_reader_signals[SIGNAL_PREPARED] =
|
|
g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSplitMuxPartReaderClass,
|
|
prepared), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
|
|
gstelement_class->change_state = gst_splitmux_part_reader_change_state;
|
|
gstelement_class->send_event = gst_splitmux_part_reader_send_event;
|
|
|
|
gstbin_class->handle_message = bus_handler;
|
|
}
|
|
|
|
static void
|
|
gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
|
|
{
|
|
GstElement *typefind;
|
|
|
|
reader->active = FALSE;
|
|
reader->duration = GST_CLOCK_TIME_NONE;
|
|
|
|
g_cond_init (&reader->inactive_cond);
|
|
g_mutex_init (&reader->lock);
|
|
g_mutex_init (&reader->type_lock);
|
|
|
|
/* FIXME: Create elements on a state change */
|
|
reader->src = gst_element_factory_make ("filesrc", NULL);
|
|
if (reader->src == NULL) {
|
|
GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
|
|
return;
|
|
}
|
|
gst_bin_add (GST_BIN_CAST (reader), reader->src);
|
|
|
|
typefind = gst_element_factory_make ("typefind", NULL);
|
|
if (!typefind) {
|
|
GST_ERROR_OBJECT (reader,
|
|
"Failed to create typefind element - check your installation");
|
|
return;
|
|
}
|
|
|
|
gst_bin_add (GST_BIN_CAST (reader), typefind);
|
|
reader->typefind = typefind;
|
|
|
|
if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
|
|
GST_ERROR_OBJECT (reader,
|
|
"Failed to link typefind element - check your installation");
|
|
return;
|
|
}
|
|
|
|
g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
|
|
reader);
|
|
}
|
|
|
|
static void
|
|
splitmux_part_reader_dispose (GObject * object)
|
|
{
|
|
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
|
|
|
|
splitmux_part_reader_reset (reader);
|
|
|
|
G_OBJECT_CLASS (parent_class)->dispose (object);
|
|
}
|
|
|
|
static void
|
|
splitmux_part_reader_finalize (GObject * object)
|
|
{
|
|
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
|
|
|
|
g_mutex_clear (&reader->lock);
|
|
g_mutex_clear (&reader->type_lock);
|
|
|
|
g_free (reader->path);
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
static void
|
|
splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
|
|
{
|
|
GList *cur;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstPad *pad = GST_PAD_CAST (cur->data);
|
|
gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
|
|
gst_object_unref (GST_OBJECT_CAST (pad));
|
|
}
|
|
|
|
g_list_free (reader->pads);
|
|
reader->pads = NULL;
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
}
|
|
|
|
static GstSplitMuxPartPad *
|
|
gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
|
|
GstPad * target)
|
|
{
|
|
GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
|
|
"name", GST_PAD_NAME (target),
|
|
"direction", GST_PAD_SINK,
|
|
NULL);
|
|
pad->target = target;
|
|
pad->reader = reader;
|
|
|
|
gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
|
|
|
|
return pad;
|
|
}
|
|
|
|
static void
|
|
new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
|
|
GstSplitMuxPartReader * reader)
|
|
{
|
|
GstPad *out_pad = NULL;
|
|
GstSplitMuxPartPad *proxy_pad;
|
|
GstCaps *caps;
|
|
GstPadLinkReturn link_ret;
|
|
|
|
caps = gst_pad_get_current_caps (pad);
|
|
|
|
GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
|
|
" caps %" GST_PTR_FORMAT, reader->path, pad, caps);
|
|
|
|
gst_caps_unref (caps);
|
|
|
|
/* Look up or create the output pad */
|
|
if (reader->get_pad_cb)
|
|
out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
|
|
if (out_pad == NULL)
|
|
return;
|
|
|
|
/* Create our proxy pad to interact with this new pad */
|
|
proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
|
|
GST_DEBUG_OBJECT (reader,
|
|
"created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
|
|
proxy_pad, out_pad);
|
|
|
|
link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
|
|
if (link_ret != GST_PAD_LINK_OK) {
|
|
gst_object_unref (proxy_pad);
|
|
GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
|
|
("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
|
|
" ret %d", reader->path, pad, link_ret));
|
|
return;
|
|
}
|
|
GST_DEBUG_OBJECT (reader,
|
|
"new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
|
|
pad, proxy_pad);
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
reader->pads = g_list_prepend (reader->pads, proxy_pad);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
}
|
|
|
|
static gboolean
|
|
gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
|
|
{
|
|
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
|
|
gboolean ret = FALSE;
|
|
GstPad *pad = NULL;
|
|
|
|
/* Send event to the first source pad we found */
|
|
SPLITMUX_PART_LOCK (reader);
|
|
if (reader->pads) {
|
|
GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
|
|
pad = gst_pad_get_peer (proxy_pad);
|
|
}
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
if (pad) {
|
|
ret = gst_pad_send_event (pad, event);
|
|
gst_object_unref (pad);
|
|
} else {
|
|
gst_event_unref (event);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
|
|
static void
|
|
gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
|
|
GstClockTime time)
|
|
{
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (time));
|
|
gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
|
|
GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
|
|
GST_SEEK_TYPE_END, 0);
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
|
|
/* Wait for flush to finish, so old data is gone */
|
|
while (reader->flushing) {
|
|
GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
|
|
SPLITMUX_PART_WAIT (reader);
|
|
}
|
|
}
|
|
|
|
/* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
|
|
static gboolean
|
|
gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
|
|
GstSegment * target_seg)
|
|
{
|
|
GstSeekFlags flags;
|
|
GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
|
|
|
|
flags = target_seg->flags | GST_SEEK_FLAG_FLUSH;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
if (target_seg->start >= reader->start_offset)
|
|
start = target_seg->start - reader->start_offset;
|
|
/* If the segment stop is within this part, don't play to the end */
|
|
if (target_seg->stop != -1 &&
|
|
target_seg->stop < reader->start_offset + reader->duration)
|
|
stop = target_seg->stop - reader->start_offset;
|
|
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
GST_DEBUG_OBJECT (reader,
|
|
"Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
|
|
GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
|
|
GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
|
|
|
|
return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
|
|
target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
|
|
stop);
|
|
}
|
|
|
|
/* Called with lock held */
|
|
static void
|
|
gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
|
|
{
|
|
/* Trigger a flushing seek to near the end of the file and run each stream
|
|
* to EOS in order to find the smallest end timestamp to start the next
|
|
* file from
|
|
*/
|
|
if (GST_CLOCK_TIME_IS_VALID (reader->duration)
|
|
&& reader->duration > GST_SECOND) {
|
|
GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
|
|
gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
|
|
}
|
|
|
|
/* Wait for things to happen */
|
|
while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
|
|
SPLITMUX_PART_WAIT (reader);
|
|
|
|
if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
|
|
/* Fire the prepared signal and go to READY state */
|
|
GST_DEBUG_OBJECT (reader,
|
|
"Stream measuring complete. File %s is now ready. Firing prepared signal",
|
|
reader->path);
|
|
reader->prep_state = PART_STATE_READY;
|
|
g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
|
|
}
|
|
}
|
|
|
|
static GstElement *
|
|
find_demuxer (GstCaps * caps)
|
|
{
|
|
GList *factories =
|
|
gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
|
|
GST_RANK_MARGINAL);
|
|
GList *compat_elements;
|
|
GstElement *e = NULL;
|
|
|
|
if (factories == NULL)
|
|
return NULL;
|
|
|
|
compat_elements =
|
|
gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
|
|
|
|
if (compat_elements) {
|
|
/* Just take the first (highest ranked) option */
|
|
GstElementFactory *factory =
|
|
GST_ELEMENT_FACTORY_CAST (compat_elements->data);
|
|
e = gst_element_factory_create (factory, NULL);
|
|
gst_plugin_feature_list_free (compat_elements);
|
|
}
|
|
|
|
if (factories)
|
|
gst_plugin_feature_list_free (factories);
|
|
|
|
return e;
|
|
}
|
|
|
|
static void
|
|
type_found (GstElement * typefind, guint probability,
|
|
GstCaps * caps, GstSplitMuxPartReader * reader)
|
|
{
|
|
GstElement *demux;
|
|
|
|
GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
|
|
|
|
/* typefind found a type. Look for the demuxer to handle it */
|
|
demux = reader->demux = find_demuxer (caps);
|
|
if (reader->demux == NULL) {
|
|
GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
|
|
return;
|
|
}
|
|
|
|
gst_element_set_locked_state (demux, TRUE);
|
|
gst_bin_add (GST_BIN_CAST (reader), demux);
|
|
gst_element_link_pads (reader->typefind, "src", demux, NULL);
|
|
gst_element_sync_state_with_parent (reader->demux);
|
|
gst_element_set_locked_state (demux, FALSE);
|
|
|
|
/* Connect to demux signals */
|
|
g_signal_connect (demux,
|
|
"pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
|
|
g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
|
|
}
|
|
|
|
static void
|
|
check_if_pads_collected (GstSplitMuxPartReader * reader)
|
|
{
|
|
if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
|
|
/* Check we have all pads and each pad has seen a buffer */
|
|
if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
|
|
GST_DEBUG_OBJECT (reader,
|
|
"no more pads - file %s. Measuring stream length", reader->path);
|
|
reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
|
|
{
|
|
GstClockTime duration = GST_CLOCK_TIME_NONE;
|
|
GList *cur;
|
|
/* Query the minimum duration of any pad in this piece and store it.
|
|
* FIXME: Only consider audio and video */
|
|
SPLITMUX_PART_LOCK (reader);
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstPad *target = GST_PAD_CAST (cur->data);
|
|
if (target) {
|
|
gint64 cur_duration;
|
|
if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
|
|
GST_INFO_OBJECT (reader,
|
|
"file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
|
|
reader->path, target, GST_TIME_ARGS (cur_duration));
|
|
if (cur_duration < duration)
|
|
duration = cur_duration;
|
|
}
|
|
}
|
|
}
|
|
GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
|
|
reader->path, GST_TIME_ARGS (duration));
|
|
reader->duration = (GstClockTime) duration;
|
|
|
|
reader->no_more_pads = TRUE;
|
|
|
|
check_if_pads_collected (reader);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
}
|
|
|
|
gboolean
|
|
gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
|
|
GstPad * src_pad, GstQuery * query)
|
|
{
|
|
GstPad *target = NULL;
|
|
gboolean ret;
|
|
GList *cur;
|
|
|
|
SPLITMUX_PART_LOCK (part);
|
|
/* Find the pad corresponding to the visible output target pad */
|
|
for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (part_pad->target == src_pad) {
|
|
target = gst_object_ref (GST_OBJECT_CAST (part_pad));
|
|
break;
|
|
}
|
|
}
|
|
SPLITMUX_PART_UNLOCK (part);
|
|
|
|
if (target == NULL)
|
|
return FALSE;
|
|
|
|
ret = gst_pad_peer_query (target, query);
|
|
gst_object_unref (GST_OBJECT_CAST (target));
|
|
|
|
if (ret == FALSE)
|
|
goto out;
|
|
|
|
/* Post-massaging of queries */
|
|
switch (GST_QUERY_TYPE (query)) {
|
|
case GST_QUERY_POSITION:{
|
|
GstFormat fmt;
|
|
gint64 position;
|
|
|
|
gst_query_parse_position (query, &fmt, &position);
|
|
if (fmt != GST_FORMAT_TIME)
|
|
return FALSE;
|
|
SPLITMUX_PART_LOCK (part);
|
|
position += part->start_offset;
|
|
GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (position));
|
|
SPLITMUX_PART_UNLOCK (part);
|
|
|
|
gst_query_set_position (query, fmt, position);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
out:
|
|
gst_object_unref (target);
|
|
return ret;
|
|
}
|
|
|
|
static GstStateChangeReturn
|
|
gst_splitmux_part_reader_change_state (GstElement * element,
|
|
GstStateChange transition)
|
|
{
|
|
GstStateChangeReturn ret;
|
|
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
|
|
|
|
switch (transition) {
|
|
case GST_STATE_CHANGE_NULL_TO_READY:{
|
|
break;
|
|
}
|
|
case GST_STATE_CHANGE_READY_TO_PAUSED:{
|
|
/* Hold the splitmux type lock until after the
|
|
* parent state change function has finished
|
|
* changing the states of things, and type finding can continue */
|
|
SPLITMUX_PART_LOCK (reader);
|
|
g_object_set (reader->src, "location", reader->path, NULL);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
SPLITMUX_PART_TYPE_LOCK (reader);
|
|
break;
|
|
}
|
|
case GST_STATE_CHANGE_READY_TO_NULL:
|
|
case GST_STATE_CHANGE_PAUSED_TO_READY:
|
|
SPLITMUX_PART_LOCK (reader);
|
|
gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
|
|
reader->running = FALSE;
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
break;
|
|
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
|
|
SPLITMUX_PART_LOCK (reader);
|
|
reader->active = FALSE;
|
|
gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
|
|
if (ret == GST_STATE_CHANGE_FAILURE) {
|
|
if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
|
|
/* Make sure to release the lock we took above */
|
|
SPLITMUX_PART_TYPE_UNLOCK (reader);
|
|
}
|
|
goto beach;
|
|
}
|
|
|
|
switch (transition) {
|
|
case GST_STATE_CHANGE_READY_TO_PAUSED:
|
|
/* Sleep and wait until all streams have been collected, then do the seeks
|
|
* to measure the stream lengths. This took the type lock above,
|
|
* but it's OK to release it now and let typefinding happen... */
|
|
SPLITMUX_PART_TYPE_UNLOCK (reader);
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
|
|
gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
|
|
reader->running = TRUE;
|
|
|
|
while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
|
|
GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
|
|
SPLITMUX_PART_WAIT (reader);
|
|
}
|
|
|
|
if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
|
|
reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
|
|
gst_splitmux_part_reader_measure_streams (reader);
|
|
} else if (reader->prep_state == PART_STATE_FAILED)
|
|
ret = GST_STATE_CHANGE_FAILURE;
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
break;
|
|
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
|
|
SPLITMUX_PART_LOCK (reader);
|
|
gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
|
|
reader->active = TRUE;
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
break;
|
|
case GST_STATE_CHANGE_READY_TO_NULL:
|
|
reader->prep_state = PART_STATE_NULL;
|
|
splitmux_part_reader_reset (reader);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
beach:
|
|
return ret;
|
|
}
|
|
|
|
static gboolean
|
|
check_bus_messages (GstSplitMuxPartReader * part)
|
|
{
|
|
gboolean ret = FALSE;
|
|
GstBus *bus;
|
|
GstMessage *m;
|
|
|
|
bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
|
|
while ((m = gst_bus_pop (bus)) != NULL) {
|
|
if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
|
|
GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
|
|
gst_message_unref (m);
|
|
goto done;
|
|
}
|
|
gst_message_unref (m);
|
|
}
|
|
ret = TRUE;
|
|
done:
|
|
gst_object_unref (bus);
|
|
return ret;
|
|
}
|
|
|
|
gboolean
|
|
gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
|
|
{
|
|
GstStateChangeReturn ret;
|
|
|
|
ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
|
|
|
|
if (ret != GST_STATE_CHANGE_SUCCESS)
|
|
return FALSE;
|
|
|
|
return check_bus_messages (part);
|
|
}
|
|
|
|
void
|
|
gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
|
|
{
|
|
gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
|
|
}
|
|
|
|
void
|
|
gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
|
|
const gchar * path)
|
|
{
|
|
reader->path = g_strdup (path);
|
|
}
|
|
|
|
gboolean
|
|
gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
|
|
GstSegment * seg)
|
|
{
|
|
GST_DEBUG_OBJECT (reader, "Activating part reader");
|
|
|
|
if (!gst_splitmux_part_reader_seek_to_segment (reader, seg)) {
|
|
GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
|
|
seg);
|
|
return FALSE;
|
|
}
|
|
if (gst_element_set_state (GST_ELEMENT_CAST (reader),
|
|
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
|
|
GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
void
|
|
gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
|
|
{
|
|
GST_DEBUG_OBJECT (reader, "Deactivating reader");
|
|
gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
|
|
}
|
|
|
|
void
|
|
gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
|
|
gboolean flushing)
|
|
{
|
|
GList *cur;
|
|
|
|
GST_LOG_OBJECT (reader, "%s dataqueues",
|
|
flushing ? "Flushing" : "Done flushing");
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
gst_data_queue_set_flushing (part_pad->queue, flushing);
|
|
if (flushing)
|
|
gst_data_queue_flush (part_pad->queue);
|
|
}
|
|
};
|
|
|
|
void
|
|
gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
|
|
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
|
|
{
|
|
reader->cb_data = cb_data;
|
|
reader->get_pad_cb = get_pad_cb;
|
|
}
|
|
|
|
GstClockTime
|
|
gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
|
|
{
|
|
GList *cur;
|
|
GstClockTime ret = GST_CLOCK_TIME_NONE;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (part_pad->max_ts < ret)
|
|
ret = part_pad->max_ts;
|
|
}
|
|
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
return ret;
|
|
}
|
|
|
|
void
|
|
gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
|
|
GstClockTime offset)
|
|
{
|
|
SPLITMUX_PART_LOCK (reader);
|
|
reader->start_offset = offset;
|
|
GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (offset));
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
}
|
|
|
|
GstClockTime
|
|
gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
|
|
{
|
|
GstClockTime ret = GST_CLOCK_TIME_NONE;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
ret = reader->start_offset;
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
return ret;
|
|
}
|
|
|
|
GstClockTime
|
|
gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
|
|
{
|
|
GstClockTime dur;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
dur = reader->duration;
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
return dur;
|
|
}
|
|
|
|
GstPad *
|
|
gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
|
|
GstPad * target)
|
|
{
|
|
GstPad *result = NULL;
|
|
GList *cur;
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
|
|
GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
|
|
if (part_pad->target == target) {
|
|
result = (GstPad *) gst_object_ref (part_pad);
|
|
break;
|
|
}
|
|
}
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
return result;
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
|
|
GstDataQueueItem ** item)
|
|
{
|
|
GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
|
|
GstDataQueue *q;
|
|
GstFlowReturn ret;
|
|
|
|
/* Get one item from the appropriate dataqueue */
|
|
SPLITMUX_PART_LOCK (reader);
|
|
if (reader->prep_state == PART_STATE_FAILED) {
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
q = gst_object_ref (part_pad->queue);
|
|
|
|
/* Have to drop the lock around pop, so we can be woken up for flush */
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
|
|
ret = GST_FLOW_FLUSHING;
|
|
goto out;
|
|
}
|
|
|
|
SPLITMUX_PART_LOCK (reader);
|
|
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
if (GST_IS_EVENT ((*item)->object)) {
|
|
GstEvent *e = (GstEvent *) ((*item)->object);
|
|
/* Mark this pad as EOS */
|
|
if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
|
|
part_pad->is_eos = TRUE;
|
|
}
|
|
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
|
|
ret = GST_FLOW_OK;
|
|
out:
|
|
gst_object_unref (q);
|
|
return ret;
|
|
}
|
|
|
|
static void
|
|
bus_handler (GstBin * bin, GstMessage * message)
|
|
{
|
|
GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
|
|
|
|
switch (GST_MESSAGE_TYPE (message)) {
|
|
case GST_MESSAGE_ERROR:
|
|
/* Make sure to set the state to failed and wake up the listener
|
|
* on error */
|
|
SPLITMUX_PART_LOCK (reader);
|
|
reader->prep_state = PART_STATE_FAILED;
|
|
SPLITMUX_PART_BROADCAST (reader);
|
|
SPLITMUX_PART_UNLOCK (reader);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
|
|
}
|