examples/app/: Added an example on how to use appsrc in playbin in streaming mode from an mmapped file.

Original commit message from CVS:
* examples/app/.cvsignore:
* examples/app/Makefile.am:
* examples/app/appsrc-stream.c: (read_data), (start_feed),
(stop_feed), (found_source), (bus_message), (main):
Added an example on how to use appsrc in playbin in streaming mode from
an mmapped file.
* examples/app/appsrc_ex.c: (main):
Set pipeline to NULL to free queued buffers.
* gst-libs/gst/app/gstapp-marshal.list:
* gst-libs/gst/app/gstappsrc.c: (stream_type_get_type), (_do_init),
(gst_app_src_class_init), (gst_app_src_init),
(gst_app_src_flush_queued), (gst_app_src_dispose),
(gst_app_src_set_property), (gst_app_src_get_property),
(gst_app_src_unlock), (gst_app_src_unlock_stop),
(gst_app_src_start), (gst_app_src_stop), (gst_app_src_is_seekable),
(gst_app_src_check_get_range), (gst_app_src_do_seek),
(gst_app_src_create), (gst_app_src_set_stream_type),
(gst_app_src_get_stream_type), (gst_app_src_set_max_bytes),
(gst_app_src_get_max_bytes), (gst_app_src_push_buffer),
(gst_app_src_end_of_stream), (gst_app_src_uri_get_type),
(gst_app_src_uri_get_protocols), (gst_app_src_uri_get_uri),
(gst_app_src_uri_set_uri), (gst_app_src_uri_handler_init):
* gst-libs/gst/app/gstappsrc.h:
Measure max queue size in bytes instead.
Add support for 3 modes of operation, streaming, seekable and
random-access, making basesrc handle the scheduling modes for each.
Add appsrc:// uri handler so that automatic plugging can be done from
playbin2 or uridecodebin, for example.
Added support for custom segment formats.
Add support for push and pull based operations from the application.
Expand the methods so that errors can be detected.
Flush the queued buffers on seeks and when shutting down.
Add signals to inform the app that a seek must happen.
This commit is contained in:
Wim Taymans 2008-06-05 16:38:50 +00:00
parent 71546edc4b
commit 20d64607d4
8 changed files with 673 additions and 117 deletions

2
common

@ -1 +1 @@
Subproject commit 68fb019d4044b9878aef4ca223fc13c19ffc7d0c
Subproject commit 46ec7dfc1c09ff550ed6b7a4e0d3f2b2ac7d3ee8

View file

@ -1 +1,4 @@
VOID:UINT64
VOID:UINT
BOOLEAN:UINT64
ENUM:OBJECT
ENUM:VOID

View file

@ -23,7 +23,7 @@
#endif
#include <gst/gst.h>
#include <gst/base/gstpushsrc.h>
#include <gst/base/gstbasesrc.h>
#include <string.h>
@ -53,17 +53,19 @@ enum
LAST_SIGNAL
};
#define DEFAULT_PROP_MAX_BUFFERS 0
#define DEFAULT_PROP_SIZE -1
#define DEFAULT_PROP_SEEKABLE FALSE
#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
#define DEFAULT_PROP_MAX_BYTES 200000
#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
enum
{
PROP_0,
PROP_CAPS,
PROP_SIZE,
PROP_SEEKABLE,
PROP_MAX_BUFFERS,
PROP_STREAM_TYPE,
PROP_MAX_BYTES,
PROP_FORMAT,
PROP_LAST
};
@ -74,6 +76,28 @@ GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
static GType
stream_type_get_type (void)
{
static GType stream_type_type = 0;
static const GEnumValue stream_type[] = {
{GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
{GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
{GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
{0, NULL, NULL},
};
if (!stream_type_type) {
stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
}
return stream_type_type;
}
static void gst_app_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static void gst_app_src_dispose (GObject * object);
static void gst_app_src_finalize (GObject * object);
@ -82,15 +106,32 @@ static void gst_app_src_set_property (GObject * object, guint prop_id,
static void gst_app_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstFlowReturn gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf);
static gboolean gst_app_src_start (GstBaseSrc * psrc);
static gboolean gst_app_src_stop (GstBaseSrc * psrc);
static gboolean gst_app_src_unlock (GstBaseSrc * psrc);
static gboolean gst_app_src_unlock_stop (GstBaseSrc * psrc);
static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
guint64 offset, guint size, GstBuffer ** buf);
static gboolean gst_app_src_start (GstBaseSrc * bsrc);
static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
GST_BOILERPLATE (GstAppSrc, gst_app_src, GstPushSrc, GST_TYPE_PUSH_SRC);
static void
_do_init (GType filesrc_type)
{
static const GInterfaceInfo urihandler_info = {
gst_app_src_uri_handler_init,
NULL,
NULL
};
g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
&urihandler_info);
}
GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
_do_init);
static void
gst_app_src_base_init (gpointer g_class)
@ -109,7 +150,6 @@ static void
gst_app_src_class_init (GstAppSrcClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstPushSrcClass *pushsrc_class = (GstPushSrcClass *) klass;
GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
gobject_class->dispose = gst_app_src_dispose;
@ -123,34 +163,46 @@ gst_app_src_class_init (GstAppSrcClass * klass)
"The allowed caps for the src pad", GST_TYPE_CAPS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_FORMAT,
g_param_spec_enum ("format", "Format",
"The format of the segment events and seek", GST_TYPE_FORMAT,
DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SIZE,
g_param_spec_int64 ("size", "Size",
"The size of the data stream (-1 if unknown)",
-1, G_MAXINT64, DEFAULT_PROP_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SEEKABLE,
g_param_spec_boolean ("seekable", "Seekable",
"If the source is seekable", DEFAULT_PROP_SEEKABLE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
g_param_spec_enum ("stream-type", "Stream Type",
"the type of the stream", GST_TYPE_APP_STREAM_TYPE,
DEFAULT_PROP_STREAM_TYPE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
g_param_spec_uint ("max-buffers", "Max Buffers",
"The maximum number of buffers to queue internally (0 = unlimited)",
0, G_MAXUINT, DEFAULT_PROP_MAX_BUFFERS,
g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
g_param_spec_uint64 ("max-bytes", "Max bytes",
"The maximum number of bytes to queue internally (0 = unlimited)",
0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::need-data:
* @appsrc: the appsrc element that emited the signal
* @length: the amount of bytes needed.
*
* Signal that the source needs more data. In the callback you should call
* push-buffer or end-of-stream.
* Signal that the source needs more data. In the callback or from another
* thread you should call push-buffer or end-of-stream.
*
* @length is just a hint and when it is set to -1, any number of bytes can be
* pushed into @appsrc.
*
* You can call push-buffer multiple times until the enough-data signal is
* fired.
*/
gst_app_src_signals[SIGNAL_NEED_DATA] =
g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, need_data),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
NULL, NULL, gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
/**
* GstAppSrc::enough-data:
@ -160,10 +212,11 @@ gst_app_src_class_init (GstAppSrcClass * klass)
* application stops calling push-buffer until the need-data signal is
* emited again to avoid excessive buffer queueing.
*/
gst_app_src_signals[SIGNAL_NEED_DATA] =
gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstAppSrc::seek-data:
* @appsrc: the appsrc element that emited the signal
@ -171,29 +224,53 @@ gst_app_src_class_init (GstAppSrcClass * klass)
*
* Seek to the given offset. The next push-buffer should produce buffers from
* the new @offset.
* This callback is only called for seekable stream types.
*
* Returns: %TRUE if the seek succeeded.
*/
gst_app_src_signals[SIGNAL_SEEK_DATA] =
g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
NULL, NULL, gst_app_marshal_VOID__UINT64, G_TYPE_NONE, 1, G_TYPE_UINT64);
NULL, NULL, gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
G_TYPE_UINT64);
/**
* GstAppSrc::push-buffer:
* @appsrc: the appsrc
* @buffer: a buffer to push
*
* Adds a buffer to the queue of buffers that the appsrc element will
* push to its source pad. This function will take ownership of @buffer.
*/
gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
push_buffer), NULL, NULL, g_cclosure_marshal_VOID__OBJECT,
G_TYPE_NONE, 1, GST_TYPE_BUFFER);
push_buffer), NULL, NULL, gst_app_marshal_ENUM__OBJECT,
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
/**
* GstAppSrc::end-of-stream:
* @appsrc: the appsrc
*
* Notify @appsrc that no more buffer are available.
*/
gst_app_src_signals[SIGNAL_END_OF_STREAM] =
g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
end_of_stream), NULL, NULL, g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0, G_TYPE_NONE);
end_of_stream), NULL, NULL, gst_app_marshal_ENUM__VOID,
GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
pushsrc_class->create = gst_app_src_create;
basesrc_class->create = gst_app_src_create;
basesrc_class->start = gst_app_src_start;
basesrc_class->stop = gst_app_src_stop;
basesrc_class->unlock = gst_app_src_unlock;
basesrc_class->unlock_stop = gst_app_src_unlock_stop;
basesrc_class->do_seek = gst_app_src_do_seek;
basesrc_class->is_seekable = gst_app_src_is_seekable;
basesrc_class->check_get_range = gst_app_src_check_get_range;
klass->push_buffer = gst_app_src_push_buffer;
klass->end_of_stream = gst_app_src_end_of_stream;
}
static void
@ -204,8 +281,18 @@ gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
appsrc->queue = g_queue_new ();
appsrc->size = DEFAULT_PROP_SIZE;
appsrc->seekable = DEFAULT_PROP_SEEKABLE;
appsrc->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
appsrc->stream_type = DEFAULT_PROP_STREAM_TYPE;
appsrc->max_bytes = DEFAULT_PROP_MAX_BYTES;
appsrc->format = DEFAULT_PROP_FORMAT;
}
static void
gst_app_src_flush_queued (GstAppSrc * src)
{
GstBuffer *buf;
while ((buf = g_queue_pop_head (src->queue)))
gst_buffer_unref (buf);
}
static void
@ -217,6 +304,7 @@ gst_app_src_dispose (GObject * obj)
gst_caps_unref (appsrc->caps);
appsrc->caps = NULL;
}
gst_app_src_flush_queued (appsrc);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
@ -246,11 +334,14 @@ gst_app_src_set_property (GObject * object, guint prop_id,
case PROP_SIZE:
gst_app_src_set_size (appsrc, g_value_get_int64 (value));
break;
case PROP_SEEKABLE:
gst_app_src_set_seekable (appsrc, g_value_get_boolean (value));
case PROP_STREAM_TYPE:
gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
break;
case PROP_MAX_BUFFERS:
gst_app_src_set_max_buffers (appsrc, g_value_get_uint (value));
case PROP_MAX_BYTES:
gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
break;
case PROP_FORMAT:
appsrc->format = g_value_get_enum (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -269,6 +360,7 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
{
GstCaps *caps;
/* we're missing a _take_caps() function to transfer ownership */
caps = gst_app_src_get_caps (appsrc);
gst_value_set_caps (value, caps);
if (caps)
@ -278,11 +370,14 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SIZE:
g_value_set_int64 (value, gst_app_src_get_size (appsrc));
break;
case PROP_SEEKABLE:
g_value_set_boolean (value, gst_app_src_get_seekable (appsrc));
case PROP_STREAM_TYPE:
g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
break;
case PROP_MAX_BUFFERS:
g_value_set_uint (value, gst_app_src_get_max_buffers (appsrc));
case PROP_MAX_BYTES:
g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
break;
case PROP_FORMAT:
g_value_set_enum (value, appsrc->format);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -291,9 +386,9 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
}
static gboolean
gst_app_src_unlock (GstBaseSrc * psrc)
gst_app_src_unlock (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC (psrc);
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "unlock start");
@ -305,9 +400,9 @@ gst_app_src_unlock (GstBaseSrc * psrc)
}
static gboolean
gst_app_src_unlock_stop (GstBaseSrc * psrc)
gst_app_src_unlock_stop (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC (psrc);
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "unlock stop");
@ -319,37 +414,105 @@ gst_app_src_unlock_stop (GstBaseSrc * psrc)
}
static gboolean
gst_app_src_start (GstBaseSrc * psrc)
gst_app_src_start (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC (psrc);
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "starting");
appsrc->started = TRUE;
g_mutex_unlock (appsrc->mutex);
gst_base_src_set_format (bsrc, appsrc->format);
return TRUE;
}
static gboolean
gst_app_src_stop (GstBaseSrc * psrc)
gst_app_src_stop (GstBaseSrc * bsrc)
{
GstAppSrc *appsrc = GST_APP_SRC (psrc);
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
g_mutex_lock (appsrc->mutex);
GST_DEBUG_OBJECT (appsrc, "stopping");
appsrc->is_eos = FALSE;
appsrc->flushing = TRUE;
appsrc->started = FALSE;
gst_app_src_flush_queued (appsrc);
g_mutex_unlock (appsrc->mutex);
return TRUE;
}
static GstFlowReturn
gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf)
static gboolean
gst_app_src_is_seekable (GstBaseSrc * src)
{
GstAppSrc *appsrc = GST_APP_SRC (psrc);
GstAppSrc *appsrc = GST_APP_SRC (src);
gboolean res = FALSE;
switch (appsrc->stream_type) {
case GST_APP_STREAM_TYPE_STREAM:
break;
case GST_APP_STREAM_TYPE_SEEKABLE:
case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
res = TRUE;
break;
}
return res;
}
static gboolean
gst_app_src_check_get_range (GstBaseSrc * src)
{
GstAppSrc *appsrc = GST_APP_SRC (src);
gboolean res = FALSE;
switch (appsrc->stream_type) {
case GST_APP_STREAM_TYPE_STREAM:
case GST_APP_STREAM_TYPE_SEEKABLE:
break;
case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
res = TRUE;
break;
}
return res;
}
/* will be called in push mode */
static gboolean
gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
{
GstAppSrc *appsrc = GST_APP_SRC (src);
gint64 desired_position;
gboolean res = FALSE;
desired_position = segment->last_stop;
GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
desired_position, gst_format_get_name (segment->format));
/* no need to try to seek in streaming mode */
if (appsrc->stream_type == GST_APP_STREAM_TYPE_STREAM)
return TRUE;
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
desired_position, &res);
if (res) {
GST_DEBUG_OBJECT (appsrc, "flushing queue");
gst_app_src_flush_queued (appsrc);
} else {
GST_WARNING_OBJECT (appsrc, "seek failed");
}
return res;
}
static GstFlowReturn
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
GstBuffer ** buf)
{
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
GstFlowReturn ret;
g_mutex_lock (appsrc->mutex);
@ -360,13 +523,31 @@ gst_app_src_create (GstPushSrc * psrc, GstBuffer ** buf)
/* return data as long as we have some */
if (!g_queue_is_empty (appsrc->queue)) {
again:
*buf = g_queue_pop_head (appsrc->queue);
appsrc->queued_bytes -= GST_BUFFER_SIZE (*buf);
gst_buffer_set_caps (*buf, appsrc->caps);
GST_DEBUG_OBJECT (appsrc, "we have buffer %p", *buf);
ret = GST_FLOW_OK;
break;
} else {
/* we have no data, we need some. We fire the signal with the size hint. */
g_mutex_unlock (appsrc->mutex);
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL);
g_mutex_lock (appsrc->mutex);
/* we can be flushing now because we released the lock */
if (appsrc->flushing)
goto flushing;
/* if we have a buffer now, retry to return it */
if (!g_queue_is_empty (appsrc->queue))
goto again;
}
/* check EOS */
@ -395,7 +576,6 @@ eos:
}
}
/* external API */
/**
@ -499,68 +679,70 @@ gst_app_src_get_size (GstAppSrc * appsrc)
}
/**
* gst_app_src_set_seekable:
* gst_app_src_set_stream_type:
* @appsrc: a #GstAppSrc
* @seekable: the new state
* @type: the new state
*
* Set whether the data is seekable. When this flag is set to %TRUE, the
* "seek" signal must be connected to.
* Set the stream type on @appsrc. For seekable streams, the "seek" signal must
* be connected to.
*
* A stream_type stream
*/
void
gst_app_src_set_seekable (GstAppSrc * appsrc, gboolean seekable)
gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
{
g_return_if_fail (appsrc != NULL);
g_return_if_fail (GST_IS_APP_SRC (appsrc));
GST_OBJECT_LOCK (appsrc);
GST_DEBUG_OBJECT (appsrc, "setting seekable of %d", seekable);
appsrc->seekable = seekable;
GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
appsrc->stream_type = type;
GST_OBJECT_UNLOCK (appsrc);
}
/**
* gst_app_src_get_seekable:
* gst_app_src_get_stream_type:
* @appsrc: a #GstAppSrc
*
* Get whether the stream is seekable. Control the seeking behaviour of the
* stream with gst_app_src_set_seekable().
* Get the stream type. Control the stream type of @appsrc
* with gst_app_src_set_stream_type().
*
* Returns: %TRUE if the stream is seekable.
* Returns: the stream type.
*/
gboolean
gst_app_src_get_seekable (GstAppSrc * appsrc)
GstAppStreamType
gst_app_src_get_stream_type (GstAppSrc * appsrc)
{
gboolean seekable;
gboolean stream_type;
g_return_val_if_fail (appsrc != NULL, FALSE);
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
GST_OBJECT_LOCK (appsrc);
seekable = appsrc->seekable;
GST_DEBUG_OBJECT (appsrc, "getting seekable of %d", seekable);
stream_type = appsrc->stream_type;
GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
GST_OBJECT_UNLOCK (appsrc);
return seekable;
return stream_type;
}
/**
* gst_app_src_set_max_buffers:
* gst_app_src_set_max_bytes:
* @appsrc: a #GstAppSrc
* @max: the maximum number of buffers to queue
* @max: the maximum number of bytes to queue
*
* Set the maximum amount of buffers that can be queued in @appsrc.
* After the maximum amount of buffers are queued, @appsrc will emit the
* Set the maximum amount of bytes that can be queued in @appsrc.
* After the maximum amount of bytes are queued, @appsrc will emit the
* "enough-data" signal.
*/
void
gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max)
gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
{
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_mutex_lock (appsrc->mutex);
if (max != appsrc->max_buffers) {
GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %u", max);
appsrc->max_buffers = max;
if (max != appsrc->max_bytes) {
GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %u", max);
appsrc->max_bytes = max;
/* signal the change */
g_cond_signal (appsrc->cond);
}
@ -568,23 +750,23 @@ gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint max)
}
/**
* gst_app_src_get_max_buffers:
* gst_app_src_get_max_bytes:
* @appsrc: a #GstAppSrc
*
* Get the maximum amount of buffers that can be queued in @appsrc.
* Get the maximum amount of bytes that can be queued in @appsrc.
*
* Returns: The maximum amount of buffers that can be queued.
* Returns: The maximum amount of bytes that can be queued.
*/
guint
gst_app_src_get_max_buffers (GstAppSrc * appsrc)
guint64
gst_app_src_get_max_bytes (GstAppSrc * appsrc)
{
guint result;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
g_mutex_lock (appsrc->mutex);
result = appsrc->max_buffers;
GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %u", result);
result = appsrc->max_bytes;
GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %u", result);
g_mutex_unlock (appsrc->mutex);
return result;
@ -592,42 +774,138 @@ gst_app_src_get_max_buffers (GstAppSrc * appsrc)
/**
* gst_app_src_push_buffer:
* @appsrc:
* @buffer:
* @appsrc: a #GstAppSrc
* @buffer: a #GstBuffer to push
*
* Adds a buffer to the queue of buffers that the appsrc element will
* push to its source pad. This function takes ownership of the buffer.
*
* Returns: #GST_FLOW_OK when the buffer was successfuly queued.
* #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
* #GST_FLOW_UNEXPECTED when EOS occured.
*/
void
GstFlowReturn
gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
{
g_return_if_fail (appsrc);
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_return_if_fail (GST_IS_BUFFER (buffer));
g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_mutex_lock (appsrc->mutex);
/* can't accept buffers when we are flushing or EOS */
if (appsrc->flushing)
goto flushing;
if (appsrc->is_eos)
goto eos;
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
g_queue_push_tail (appsrc->queue, buffer);
appsrc->queued_bytes += GST_BUFFER_SIZE (buffer);
if (appsrc->queued_bytes >= appsrc->max_bytes) {
GST_DEBUG_OBJECT (appsrc, "queue filled (%u >= %u), signal enough-data",
appsrc->queued_bytes, appsrc->max_bytes);
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, NULL);
}
g_cond_signal (appsrc->cond);
g_mutex_unlock (appsrc->mutex);
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
gst_buffer_unref (buffer);
return GST_FLOW_WRONG_STATE;
}
eos:
{
GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
gst_buffer_unref (buffer);
return GST_FLOW_UNEXPECTED;
}
}
/**
* gst_app_src_end_of_stream:
* @appsrc:
* @appsrc: a #GstAppSrc
*
* Indicates to the appsrc element that the last buffer queued in the
* element is the last buffer of the stream.
*
* Returns: #GST_FLOW_OK when the EOS was successfuly queued.
* #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
*/
void
GstFlowReturn
gst_app_src_end_of_stream (GstAppSrc * appsrc)
{
g_return_if_fail (appsrc);
g_return_if_fail (GST_IS_APP_SRC (appsrc));
g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
g_mutex_lock (appsrc->mutex);
/* can't accept buffers when we are flushing. We can accept them when we are
* EOS although it will not do anything. */
if (appsrc->flushing)
goto flushing;
GST_DEBUG_OBJECT (appsrc, "sending EOS");
appsrc->is_eos = TRUE;
g_cond_signal (appsrc->cond);
g_mutex_unlock (appsrc->mutex);
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
return GST_FLOW_WRONG_STATE;
}
}
/*** GSTURIHANDLER INTERFACE *************************************************/
static GstURIType
gst_app_src_uri_get_type (void)
{
return GST_URI_SRC;
}
static gchar **
gst_app_src_uri_get_protocols (void)
{
static gchar *protocols[] = { "appsrc", NULL };
return protocols;
}
static const gchar *
gst_app_src_uri_get_uri (GstURIHandler * handler)
{
return "appsrc";
}
static gboolean
gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
{
gchar *protocol;
gboolean ret;
protocol = gst_uri_get_protocol (uri);
ret = !strcmp (protocol, "appsrc");
g_free (protocol);
return ret;
}
static void
gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_app_src_uri_get_type;
iface->get_protocols = gst_app_src_uri_get_protocols;
iface->get_uri = gst_app_src_uri_get_uri;
iface->set_uri = gst_app_src_uri_set_uri;
}

View file

@ -39,9 +39,27 @@ G_BEGIN_DECLS
typedef struct _GstAppSrc GstAppSrc;
typedef struct _GstAppSrcClass GstAppSrcClass;
/**
* GstAppStreamType:
* @GST_APP_STREAM_TYPE_STREAM: No seeking is supported in the stream, such as a
* live stream.
* @GST_APP_STREAM_TYPE_SEEKABLE: The stream is seekable but seeking might not
* be very fast, such as data from a webserver.
* @GST_APP_STREAM_TYPE_RANDOM_ACCESS: The stream is seekable and seeking is fast,
* such as in a local file.
*
* The stream type.
*/
typedef enum
{
GST_APP_STREAM_TYPE_STREAM,
GST_APP_STREAM_TYPE_SEEKABLE,
GST_APP_STREAM_TYPE_RANDOM_ACCESS
} GstAppStreamType;
struct _GstAppSrc
{
GstPushSrc pushsrc;
GstBaseSrc basesrc;
/*< private >*/
GCond *cond;
@ -50,26 +68,29 @@ struct _GstAppSrc
GstCaps *caps;
gint64 size;
gboolean seekable;
guint max_buffers;
GstAppStreamType stream_type;
guint64 max_bytes;
GstFormat format;
gboolean flushing;
gboolean started;
gboolean is_eos;
guint64 queued_bytes;
GstAppStreamType current_type;
};
struct _GstAppSrcClass
{
GstPushSrcClass pushsrc_class;
GstBaseSrcClass basesrc_class;
/* signals */
void (*need_data) (GstAppSrc *src);
void (*need_data) (GstAppSrc *src, guint length);
void (*enough_data) (GstAppSrc *src);
gboolean (*seek_data) (GstAppSrc *src, guint64 offset);
/* actions */
void (*push_buffer) (GstAppSrc *src, GstBuffer *buffer);
void (*end_of_stream) (GstAppSrc *src);
GstFlowReturn (*push_buffer) (GstAppSrc *src, GstBuffer *buffer);
GstFlowReturn (*end_of_stream) (GstAppSrc *src);
};
GType gst_app_src_get_type(void);
@ -82,14 +103,14 @@ GstCaps* gst_app_src_get_caps (GstAppSrc *appsrc);
void gst_app_src_set_size (GstAppSrc *appsrc, gint64 size);
gint64 gst_app_src_get_size (GstAppSrc *appsrc);
void gst_app_src_set_seekable (GstAppSrc *appsrc, gboolean seekable);
gboolean gst_app_src_get_seekable (GstAppSrc *appsrc);
void gst_app_src_set_stream_type (GstAppSrc *appsrc, GstAppStreamType type);
GstAppStreamType gst_app_src_get_stream_type (GstAppSrc *appsrc);
void gst_app_src_set_max_buffers (GstAppSrc *appsrc, guint max);
guint gst_app_src_get_max_buffers (GstAppSrc *appsrc);
void gst_app_src_set_max_bytes (GstAppSrc *appsrc, guint64 max);
guint64 gst_app_src_get_max_bytes (GstAppSrc *appsrc);
void gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer);
void gst_app_src_end_of_stream (GstAppSrc *appsrc);
GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer);
GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc);
G_END_DECLS

View file

@ -1 +1,2 @@
appsrc_ex
appsrc-stream

View file

@ -1,5 +1,5 @@
noinst_PROGRAMS = appsrc_ex
noinst_PROGRAMS = appsrc_ex appsrc-stream
appsrc_ex_SOURCES = appsrc_ex.c
appsrc_ex_CFLAGS = $(GST_CFLAGS) $(GCONF_CFLAGS)
@ -7,3 +7,6 @@ appsrc_ex_LDFLAGS = \
$(GST_LIBS) \
$(top_builddir)/gst-libs/gst/app/libgstapp-@GST_MAJORMINOR@.la
appsrc_stream_SOURCES = appsrc-stream.c
appsrc_stream_CFLAGS = $(GST_CFLAGS) $(GCONF_CFLAGS)
appsrc_stream_LDFLAGS = $(GST_LIBS)

View file

@ -0,0 +1,248 @@
/* GStreamer
*
* appsrc-stream.c: example for using appsrc in streaming mode.
*
* Copyright (C) 2008 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
GST_DEBUG_CATEGORY (appsrc_playbin_debug);
#define GST_CAT_DEFAULT appsrc_playbin_debug
/**
* an example application of using appsrc in streaming push mode. We simply push
* buffers into appsrc. The size of the buffers we push can be any size we
* choose.
*
* This example is very close to how one would deal with a streaming webserver
* that does not support range requests or does not report the total file size.
*
* Some optimisations are done so that we don't push too much data. We connect
* to the need-data and enough-data signals to start/stop sending buffers.
*
* Appsrc in streaming mode (the default) does not support seeking so we don't
* have to handle any seek callbacks.
*
* Some formats are able to estimate the duration of the media file based on the
* file length (mp3, mpeg,..), others report an unknown length (ogg,..).
*/
typedef struct _App App;
struct _App
{
GstElement *playbin;
GstElement *appsrc;
GMainLoop *loop;
guint sourceid;
GMappedFile *file;
guint8 *data;
gsize length;
guint64 offset;
};
App s_app;
#define CHUNK_SIZE 4096
/* This method is called by the idle GSource in the mainloop. We feed CHUNK_SIZE
* bytes into appsrc.
* The ide handler is added to the mainloop when appsrc requests us to start
* sending data (need-data signal) and is removed when appsrc has enough data
* (enough-data signal).
*/
static gboolean
read_data (App * app)
{
GstBuffer *buffer;
guint len;
GstFlowReturn ret;
buffer = gst_buffer_new ();
if (app->offset >= app->length) {
/* we are EOS, send end-of-stream and remove the source */
g_signal_emit_by_name (app->appsrc, "end-of-stream", &ret);
return FALSE;
}
/* read the next chunk */
len = CHUNK_SIZE;
if (app->offset + len > app->length)
len = app->length - app->offset;
GST_BUFFER_DATA (buffer) = app->data + app->offset;
GST_BUFFER_SIZE (buffer) = len;
GST_DEBUG ("feed buffer %p, offset %" G_GUINT64_FORMAT "-%u", buffer,
app->offset, len);
g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret);
if (ret != GST_FLOW_OK) {
/* some error, stop sending data */
return FALSE;
}
app->offset += len;
return TRUE;
}
/* This signal callback is called when appsrc needs data, we add an idle handler
* to the mainloop to start pushing data into the appsrc */
static void
start_feed (GstElement * playbin, guint size, App * app)
{
if (app->sourceid == 0) {
GST_DEBUG ("start feeding");
app->sourceid = g_idle_add ((GSourceFunc) read_data, app);
}
}
/* This callback is called when appsrc has enough data and we can stop sending.
* We remove the idle handler from the mainloop */
static void
stop_feed (GstElement * playbin, App * app)
{
if (app->sourceid != 0) {
GST_DEBUG ("stop feeding");
g_source_remove (app->sourceid);
app->sourceid = 0;
}
}
/* this callback is called when playbin2 has constructed a source object to read
* from. Since we provided the appsrc:// uri to playbin2, this will be the
* appsrc that we must handle. We set up some signals to start and stop pushing
* data into appsrc */
static void
found_source (GObject * playbin, GParamSpec * pspec, App * app)
{
/* get a handle to the appsrc */
g_object_get (playbin, pspec->name, &app->appsrc, NULL);
GST_DEBUG ("got appsrc %p", app->appsrc);
/* we can set the length in appsrc. This allows some elements to estimate the
* total duration of the stream. It's a good idea to set the property when you
* can but it's not required. */
g_object_set (app->appsrc, "size", app->length, NULL);
/* configure the appsrc, we will push data into the appsrc from the
* mainloop. */
g_signal_connect (app->appsrc, "need-data", G_CALLBACK (start_feed), app);
g_signal_connect (app->appsrc, "enough-data", G_CALLBACK (stop_feed), app);
}
static gboolean
bus_message (GstBus * bus, GstMessage * message, App * app)
{
GST_DEBUG ("got message %s",
gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:
g_error ("received error");
g_main_loop_quit (app->loop);
break;
case GST_MESSAGE_EOS:
g_main_loop_quit (app->loop);
break;
default:
break;
}
return TRUE;
}
int
main (int argc, char *argv[])
{
App *app = &s_app;
GError *error = NULL;
GstBus *bus;
gst_init (&argc, &argv);
GST_DEBUG_CATEGORY_INIT (appsrc_playbin_debug, "appsrc-playbin", 0,
"appsrc playbin example");
if (argc < 2) {
g_print ("usage: %s <filename>\n", argv[0]);
return -1;
}
/* try to open the file as an mmapped file */
app->file = g_mapped_file_new (argv[1], FALSE, &error);
if (error) {
g_print ("failed to open file: %s\n", error->message);
g_error_free (error);
return -2;
}
/* get some vitals, this will be used to read data from the mmapped file and
* feed it to appsrc. */
app->length = g_mapped_file_get_length (app->file);
app->data = (guint8 *) g_mapped_file_get_contents (app->file);
app->offset = 0;
/* create a mainloop to get messages and to handle the idle handler that will
* feed data to appsrc. */
app->loop = g_main_loop_new (NULL, TRUE);
app->playbin = gst_element_factory_make ("playbin2", NULL);
g_assert (app->playbin);
bus = gst_pipeline_get_bus (GST_PIPELINE (app->playbin));
/* add watch for messages */
gst_bus_add_watch (bus, (GstBusFunc) bus_message, app);
/* set to read from appsrc */
g_object_set (app->playbin, "uri", "appsrc://", NULL);
/* get notification when the source is created so that we get a handle to it
* and can configure it */
g_signal_connect (app->playbin, "notify::source", (GCallback) found_source,
app);
/* go to playing and wait in a mainloop. */
gst_element_set_state (app->playbin, GST_STATE_PLAYING);
/* this mainloop is stopped when we receive an error or EOS */
g_main_loop_run (app->loop);
GST_DEBUG ("stopping");
gst_element_set_state (app->playbin, GST_STATE_NULL);
/* free the file */
g_mapped_file_free (app->file);
gst_object_unref (bus);
g_main_loop_unref (app->loop);
return 0;
}

View file

@ -62,8 +62,8 @@ main (int argc, char *argv[])
data = malloc (100);
memset (data, i, 100);
printf ("%d: creating buffer for pointer %p\n", i, data);
buf = gst_app_buffer_new (data, 100, dont_eat_my_chicken_wings, data);
printf ("%d: creating buffer for pointer %p, %p\n", i, data, buf);
gst_app_src_push_buffer (GST_APP_SRC (app->src), buf);
}
@ -73,8 +73,10 @@ main (int argc, char *argv[])
GstBuffer *buf;
buf = gst_app_sink_pull_buffer (GST_APP_SINK (app->sink));
printf ("retrieved buffer %p\n", buf);
gst_buffer_unref (buf);
}
gst_element_set_state (app->pipe, GST_STATE_NULL);
return 0;
}