Implement push-based support for demuxers

Fixes #392534
This commit is contained in:
Edward Hervey 2009-01-27 11:39:18 +01:00
parent 91ac27e6de
commit 965f23f4f1
7 changed files with 396 additions and 19 deletions

View file

@ -28,4 +28,5 @@ noinst_HEADERS = \
gstffmpeg.h \
gstffmpegcodecmap.h \
gstffmpegenc.h \
gstffmpegcfg.h
gstffmpegcfg.h \
gstffmpegpipe.h

View file

@ -152,6 +152,7 @@ plugin_init (GstPlugin * plugin)
gst_ffmpegaudioresample_register (plugin);
register_protocol (&gstreamer_protocol);
register_protocol (&gstpipe_protocol);
/* Now we can return the pointer to the newly created Plugin object. */
return TRUE;

View file

@ -61,6 +61,7 @@ int gst_ffmpeg_av_find_stream_info(AVFormatContext *ic);
G_END_DECLS
extern URLProtocol gstreamer_protocol;
extern URLProtocol gstpipe_protocol;
/* use GST_FFMPEG URL_STREAMHEADER with URL_WRONLY if the first
* buffer should be used as streamheader property on the pad's caps. */

View file

@ -39,6 +39,7 @@
#include "gstffmpeg.h"
#include "gstffmpegcodecmap.h"
#include "gstffmpegpipe.h"
typedef struct _GstFFMpegDemux GstFFMpegDemux;
typedef struct _GstFFStream GstFFStream;
@ -83,6 +84,11 @@ struct _GstFFMpegDemux
/* cached seek in READY */
GstEvent *seek_event;
/* push mode data */
GstFFMpegPipe ffpipe;
GstTask *task;
GStaticRecMutex *task_lock;
};
typedef struct _GstFFMpegDemuxClassParams
@ -107,8 +113,12 @@ struct _GstFFMpegDemuxClass
static void gst_ffmpegdemux_class_init (GstFFMpegDemuxClass * klass);
static void gst_ffmpegdemux_base_init (GstFFMpegDemuxClass * klass);
static void gst_ffmpegdemux_init (GstFFMpegDemux * demux);
static void gst_ffmpegdemux_finalize (GObject * object);
static void gst_ffmpegdemux_loop (GstPad * pad);
static gboolean gst_ffmpegdemux_sink_event (GstPad * sinkpad, GstEvent * event);
static GstFlowReturn gst_ffmpegdemux_chain (GstPad * sinkpad, GstBuffer * buf);
static void gst_ffmpegdemux_loop (GstFFMpegDemux * demux);
static gboolean gst_ffmpegdemux_sink_activate (GstPad * sinkpad);
static gboolean
gst_ffmpegdemux_sink_activate_pull (GstPad * sinkpad, gboolean active);
@ -220,6 +230,8 @@ gst_ffmpegdemux_class_init (GstFFMpegDemuxClass * klass)
parent_class = g_type_class_peek_parent (klass);
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_ffmpegdemux_finalize);
gstelement_class->change_state = gst_ffmpegdemux_change_state;
gstelement_class->send_event = gst_ffmpegdemux_send_event;
}
@ -232,13 +244,26 @@ gst_ffmpegdemux_init (GstFFMpegDemux * demux)
gint n;
demux->sinkpad = gst_pad_new_from_template (oclass->sinktempl, "sink");
gst_pad_set_activate_function (demux->sinkpad, gst_ffmpegdemux_sink_activate);
gst_pad_set_activate_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_ffmpegdemux_sink_activate));
gst_pad_set_activatepull_function (demux->sinkpad,
gst_ffmpegdemux_sink_activate_pull);
GST_DEBUG_FUNCPTR (gst_ffmpegdemux_sink_activate_pull));
gst_pad_set_activatepush_function (demux->sinkpad,
gst_ffmpegdemux_sink_activate_push);
GST_DEBUG_FUNCPTR (gst_ffmpegdemux_sink_activate_push));
gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
/* push based setup */
/* the following are not used in pull-based mode, so safe to set anyway */
gst_pad_set_event_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_ffmpegdemux_sink_event));
gst_pad_set_chain_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_ffmpegdemux_chain));
/* task for driving ffmpeg in loop function */
demux->task = gst_task_create ((GstTaskFunction) gst_ffmpegdemux_loop, demux);
demux->task_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (demux->task_lock);
gst_task_set_lock (demux->task, demux->task_lock);
demux->opened = FALSE;
demux->context = NULL;
@ -250,6 +275,29 @@ gst_ffmpegdemux_init (GstFFMpegDemux * demux)
demux->seek_event = NULL;
gst_segment_init (&demux->segment, GST_FORMAT_TIME);
/* push based data */
demux->ffpipe.tlock = g_mutex_new ();
demux->ffpipe.cond = g_cond_new ();
demux->ffpipe.adapter = gst_adapter_new ();
}
static void
gst_ffmpegdemux_finalize (GObject * object)
{
GstFFMpegDemux *demux;
demux = (GstFFMpegDemux *) object;
g_mutex_free (demux->ffpipe.tlock);
g_cond_free (demux->ffpipe.cond);
gst_object_unref (demux->ffpipe.adapter);
gst_object_unref (demux->task);
g_static_rec_mutex_free (demux->task_lock);
g_free (demux->task_lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
@ -436,6 +484,11 @@ gst_ffmpegdemux_perform_seek (GstFFMpegDemux * demux, GstEvent * event)
gboolean update;
GstSegment seeksegment;
if (!demux->seekable) {
GST_DEBUG_OBJECT (demux, "in push mode; ignoring seek");
return FALSE;
}
GST_DEBUG_OBJECT (demux, "starting seek");
if (event) {
@ -1050,7 +1103,10 @@ gst_ffmpegdemux_open (GstFFMpegDemux * demux)
gst_ffmpegdemux_close (demux);
/* open via our input protocol hack */
if (demux->seekable)
location = g_strdup_printf ("gstreamer://%p", demux->sinkpad);
else
location = g_strdup_printf ("gstpipe://%p", &demux->ffpipe);
GST_DEBUG_OBJECT (demux, "about to call av_open_input_file %s", location);
res = av_open_input_file (&demux->context, location,
@ -1178,9 +1234,8 @@ gst_ffmpegdemux_type_find (GstTypeFind * tf, gpointer priv)
/* Task */
static void
gst_ffmpegdemux_loop (GstPad * pad)
gst_ffmpegdemux_loop (GstFFMpegDemux * demux)
{
GstFFMpegDemux *demux;
GstFlowReturn ret;
gint res;
AVPacket pkt;
@ -1192,8 +1247,6 @@ gst_ffmpegdemux_loop (GstPad * pad)
gint outsize;
gboolean rawvideo;
demux = (GstFFMpegDemux *) (GST_PAD_PARENT (pad));
/* open file if we didn't so already */
if (!demux->opened)
if (!gst_ffmpegdemux_open (demux))
@ -1340,7 +1393,19 @@ pause:
GST_LOG_OBJECT (demux, "pausing task, reason %d (%s)", ret,
gst_flow_get_name (ret));
demux->running = FALSE;
if (demux->seekable)
gst_pad_pause_task (demux->sinkpad);
else {
GstFFMpegPipe *ffpipe = &demux->ffpipe;
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
/* pause task and make sure loop stops */
gst_task_pause (demux->task);
g_static_rec_mutex_lock (demux->task_lock);
g_static_rec_mutex_unlock (demux->task_lock);
demux->ffpipe.srcresult = ret;
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
}
if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
if (ret == GST_FLOW_UNEXPECTED) {
@ -1409,6 +1474,127 @@ no_buffer:
}
static gboolean
gst_ffmpegdemux_sink_event (GstPad * sinkpad, GstEvent * event)
{
GstFFMpegDemux *demux;
GstFFMpegPipe *ffpipe;
gboolean result = TRUE;
demux = (GstFFMpegDemux *) (GST_PAD_PARENT (sinkpad));
ffpipe = &(demux->ffpipe);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
/* forward event */
gst_pad_event_default (sinkpad, event);
/* now unblock the chain function */
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
ffpipe->srcresult = GST_FLOW_WRONG_STATE;
GST_FFMPEG_PIPE_SIGNAL (ffpipe);
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
/* loop might run into WRONG_STATE and end itself,
* but may also be waiting in a ffmpeg read
* trying to break that would make ffmpeg believe eos,
* so no harm to have the loop 'pausing' there ... */
goto done;
case GST_EVENT_FLUSH_STOP:
/* forward event */
gst_pad_event_default (sinkpad, event);
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
gst_adapter_clear (ffpipe->adapter);
ffpipe->srcresult = GST_FLOW_OK;
/* loop may have decided to end itself as a result of flush WRONG_STATE */
gst_task_start (demux->task);
demux->running = TRUE;
demux->flushing = FALSE;
GST_LOG_OBJECT (demux, "loop started");
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
goto done;
case GST_EVENT_EOS:
/* inform the src task that it can stop now */
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
ffpipe->eos = TRUE;
GST_FFMPEG_PIPE_SIGNAL (ffpipe);
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
/* eat this event for now, task will send eos when finished */
gst_event_unref (event);
goto done;
default:
/* for a serialized event, wait until an earlier data is gone,
* though this is no guarantee as to when task is done with it */
if (GST_EVENT_IS_SERIALIZED (event)) {
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
while (!ffpipe->needed)
GST_FFMPEG_PIPE_WAIT (ffpipe);
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
}
break;
}
result = gst_pad_event_default (sinkpad, event);
done:
return result;
}
static GstFlowReturn
gst_ffmpegdemux_chain (GstPad * sinkpad, GstBuffer * buffer)
{
GstFFMpegDemux *demux;
GstFFMpegPipe *ffpipe;
demux = (GstFFMpegDemux *) (GST_PAD_PARENT (sinkpad));
ffpipe = &demux->ffpipe;
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
if (G_UNLIKELY (ffpipe->eos))
goto eos;
if (G_UNLIKELY (ffpipe->srcresult != GST_FLOW_OK))
goto ignore;
gst_adapter_push (ffpipe->adapter, buffer);
buffer = NULL;
while (gst_adapter_available (ffpipe->adapter) >= ffpipe->needed) {
GST_FFMPEG_PIPE_SIGNAL (ffpipe);
GST_FFMPEG_PIPE_WAIT (ffpipe);
/* may have become flushing */
if (G_UNLIKELY (ffpipe->srcresult != GST_FLOW_OK))
goto ignore;
}
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
return GST_FLOW_OK;
/* special cases */
eos:
{
GST_DEBUG_OBJECT (demux, "ignoring buffer at end-of-stream");
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
gst_buffer_unref (buffer);
return GST_FLOW_UNEXPECTED;
}
ignore:
{
GST_DEBUG_OBJECT (demux, "ignoring buffer because src task encountered %s",
gst_flow_get_name (ffpipe->srcresult));
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
if (buffer)
gst_buffer_unref (buffer);
return GST_FLOW_WRONG_STATE;
}
}
static gboolean
gst_ffmpegdemux_sink_activate (GstPad * sinkpad)
{
@ -1428,23 +1614,56 @@ gst_ffmpegdemux_sink_activate (GstPad * sinkpad)
return res;
}
/* push mode:
* - not seekable
* - use gstpipe protocol, like ffmpeg's pipe protocol
* - (independently managed) task driving ffmpeg
*/
static gboolean
gst_ffmpegdemux_sink_activate_push (GstPad * sinkpad, gboolean active)
{
GstFFMpegDemux *demux;
gboolean res;
demux = (GstFFMpegDemux *) (gst_pad_get_parent (sinkpad));
GST_ELEMENT_ERROR (demux, STREAM, NOT_IMPLEMENTED,
(NULL),
("failed to activate sinkpad in pull mode, push mode not implemented yet"));
if (active) {
demux->ffpipe.eos = FALSE;
demux->ffpipe.srcresult = GST_FLOW_OK;
demux->ffpipe.needed = 0;
demux->running = TRUE;
demux->seekable = FALSE;
gst_object_unref (demux);
res = gst_task_start (demux->task);
} else {
GstFFMpegPipe *ffpipe = &demux->ffpipe;
return FALSE;
/* release chain and loop */
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
demux->ffpipe.srcresult = GST_FLOW_WRONG_STATE;
/* end streaming by making ffmpeg believe eos */
demux->ffpipe.eos = TRUE;
GST_FFMPEG_PIPE_SIGNAL (ffpipe);
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
/* make sure streaming ends */
gst_task_stop (demux->task);
g_static_rec_mutex_lock (demux->task_lock);
g_static_rec_mutex_unlock (demux->task_lock);
res = gst_task_join (demux->task);
demux->running = FALSE;
demux->seekable = FALSE;
}
gst_object_unref (demux);
return res;
}
/* pull mode:
* - seekable
* - use gstreamer protocol, like ffmpeg's file protocol
* - task driving ffmpeg based on sink pad
*/
static gboolean
gst_ffmpegdemux_sink_activate_pull (GstPad * sinkpad, gboolean active)
{
@ -1457,7 +1676,7 @@ gst_ffmpegdemux_sink_activate_pull (GstPad * sinkpad, gboolean active)
demux->running = TRUE;
demux->seekable = TRUE;
res = gst_pad_start_task (sinkpad, (GstTaskFunction) gst_ffmpegdemux_loop,
sinkpad);
demux);
} else {
demux->running = FALSE;
res = gst_pad_stop_task (sinkpad);
@ -1494,6 +1713,7 @@ gst_ffmpegdemux_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_ffmpegdemux_close (demux);
gst_adapter_clear (demux->ffpipe.adapter);
break;
default:
break;

View file

@ -0,0 +1,71 @@
/* GStreamer
* Copyright (C) <2006> Mark Nauwelaerts <manauw@skynet.be>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_FFMPEGPIPE_H__
#define __GST_FFMPEGPIPE_H__
#include <gst/base/gstadapter.h>
G_BEGIN_DECLS
/* pipe protocol helpers */
#define GST_FFMPEG_PIPE_MUTEX_LOCK(m) G_STMT_START { \
GST_LOG_OBJECT (m, "locking tlock from thread %p", g_thread_self ()); \
g_mutex_lock (m->tlock); \
GST_LOG_OBJECT (m, "locked tlock from thread %p", g_thread_self ()); \
} G_STMT_END
#define GST_FFMPEG_PIPE_MUTEX_UNLOCK(m) G_STMT_START { \
GST_LOG_OBJECT (m, "unlocking tlock from thread %p", g_thread_self ()); \
g_mutex_unlock (m->tlock); \
} G_STMT_END
#define GST_FFMPEG_PIPE_WAIT(m) G_STMT_START { \
GST_LOG_OBJECT (m, "thread %p waiting", g_thread_self ()); \
g_cond_wait (m->cond, m->tlock); \
} G_STMT_END
#define GST_FFMPEG_PIPE_SIGNAL(m) G_STMT_START { \
GST_LOG_OBJECT (m, "signalling from thread %p", g_thread_self ()); \
g_cond_signal (m->cond); \
} G_STMT_END
typedef struct _GstFFMpegPipe GstFFMpegPipe;
struct _GstFFMpegPipe
{
/* lock for syncing */
GMutex *tlock;
/* with TLOCK */
/* signals counterpart thread to have a look */
GCond *cond;
/* seen eos */
gboolean eos;
/* flowreturn obtained by src task */
GstFlowReturn srcresult;
/* adpater collecting data */
GstAdapter *adapter;
/* amount needed in adapter by src task */
guint needed;
};
G_END_DECLS
#endif /* __GST_FFMPEGPIPE_H__ */

View file

@ -32,6 +32,7 @@
#include <gst/gst.h>
#include "gstffmpeg.h"
#include "gstffmpegpipe.h"
typedef struct _GstProtocolInfo GstProtocolInfo;
@ -291,3 +292,85 @@ URLProtocol gstreamer_protocol = {
/*.url_seek = */ gst_ffmpegdata_seek,
/*.url_close = */ gst_ffmpegdata_close,
};
/* specialized protocol for cross-thread pushing,
* based on ffmpeg's pipe protocol */
static int
gst_ffmpeg_pipe_open (URLContext * h, const char *filename, int flags)
{
GstFFMpegPipe *ffpipe;
GST_LOG ("Opening %s", filename);
/* we don't support W together */
if (flags != URL_RDONLY) {
GST_WARNING ("Only read-only is supported");
return -EINVAL;
}
if (sscanf (&filename[10], "%p", &ffpipe) != 1) {
GST_WARNING ("could not decode pipe info from %s", filename);
return -EIO;
}
/* sanity check */
g_return_val_if_fail (GST_IS_ADAPTER (ffpipe->adapter), -EINVAL);
h->priv_data = (void *) ffpipe;
h->is_streamed = TRUE;
h->max_packet_size = 0;
return 0;
}
static int
gst_ffmpeg_pipe_read (URLContext * h, unsigned char *buf, int size)
{
GstFFMpegPipe *ffpipe;
const guint8 *data;
guint available;
ffpipe = (GstFFMpegPipe *) h->priv_data;
GST_LOG ("requested size %d", size);
GST_FFMPEG_PIPE_MUTEX_LOCK (ffpipe);
while ((available = gst_adapter_available (ffpipe->adapter)) < size
&& !ffpipe->eos) {
ffpipe->needed = size;
GST_FFMPEG_PIPE_SIGNAL (ffpipe);
GST_FFMPEG_PIPE_WAIT (ffpipe);
}
size = MIN (available, size);
if (size) {
data = gst_adapter_peek (ffpipe->adapter, size);
memcpy (buf, data, size);
gst_adapter_flush (ffpipe->adapter, size);
ffpipe->needed = 0;
}
GST_FFMPEG_PIPE_MUTEX_UNLOCK (ffpipe);
return size;
}
static int
gst_ffmpeg_pipe_close (URLContext * h)
{
GST_LOG ("Closing pipe");
h->priv_data = NULL;
return 0;
}
URLProtocol gstpipe_protocol = {
"gstpipe",
gst_ffmpeg_pipe_open,
gst_ffmpeg_pipe_read,
NULL,
NULL,
gst_ffmpeg_pipe_close,
};