mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-18 15:51:11 +00:00
2e5908d33f
Don't hold the private appsrc lock while pushing out a segment event, which may block indefinitely. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4951>
3139 lines
93 KiB
C
3139 lines
93 KiB
C
/* GStreamer
|
|
* Copyright (C) 2007 David Schleef <ds@schleef.org>
|
|
* (C) 2008 Wim Taymans <wim.taymans@gmail.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
/**
|
|
* SECTION:gstappsrc
|
|
* @title: GstAppSrc
|
|
* @short_description: Easy way for applications to inject buffers into a
|
|
* pipeline
|
|
* @see_also: #GstBaseSrc, appsink
|
|
*
|
|
* The appsrc element can be used by applications to insert data into a
|
|
* GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
|
|
* external API functions.
|
|
*
|
|
* appsrc can be used by linking with the libgstapp library to access the
|
|
* methods directly or by using the appsrc action signals.
|
|
*
|
|
* Before operating appsrc, the caps property must be set to fixed caps
|
|
* describing the format of the data that will be pushed with appsrc. An
|
|
* exception to this is when pushing buffers with unknown caps, in which case no
|
|
* caps should be set. This is typically true of file-like sources that push raw
|
|
* byte buffers. If you don't want to explicitly set the caps, you can use
|
|
* gst_app_src_push_sample. This method gets the caps associated with the
|
|
* sample and sets them on the appsrc replacing any previously set caps (if
|
|
* different from sample's caps).
|
|
*
|
|
* The main way of handing data to the appsrc element is by calling the
|
|
* gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
|
|
* This will put the buffer onto a queue from which appsrc will read from in its
|
|
* streaming thread. It is important to note that data transport will not happen
|
|
* from the thread that performed the push-buffer call.
|
|
*
|
|
* The "max-bytes", "max-buffers" and "max-time" properties control how much
|
|
* data can be queued in appsrc before appsrc considers the queue full. A
|
|
* filled internal queue will always signal the "enough-data" signal, which
|
|
* signals the application that it should stop pushing data into appsrc. The
|
|
* "block" property will cause appsrc to block the push-buffer method until
|
|
* free data becomes available again.
|
|
*
|
|
* When the internal queue is running out of data, the "need-data" signal is
|
|
* emitted, which signals the application that it should start pushing more data
|
|
* into appsrc.
|
|
*
|
|
* In addition to the "need-data" and "enough-data" signals, appsrc can emit the
|
|
* "seek-data" signal when the "stream-mode" property is set to "seekable" or
|
|
* "random-access". The signal argument will contain the new desired position in
|
|
* the stream expressed in the unit set with the "format" property. After
|
|
* receiving the seek-data signal, the application should push-buffers from the
|
|
* new position.
|
|
*
|
|
* These signals allow the application to operate the appsrc in two different
|
|
* ways:
|
|
*
|
|
* The push mode, in which the application repeatedly calls the push-buffer/push-sample
|
|
* method with a new buffer/sample. Optionally, the queue size in the appsrc
|
|
* can be controlled with the enough-data and need-data signals by respectively
|
|
* stopping/starting the push-buffer/push-sample calls. This is a typical
|
|
* mode of operation for the stream-type "stream" and "seekable". Use this
|
|
* mode when implementing various network protocols or hardware devices.
|
|
*
|
|
* The pull mode, in which the need-data signal triggers the next push-buffer call.
|
|
* This mode is typically used in the "random-access" stream-type. Use this
|
|
* mode for file access or other randomly accessible sources. In this mode, a
|
|
* buffer of exactly the amount of bytes given by the need-data signal should be
|
|
* pushed into appsrc.
|
|
*
|
|
* In all modes, the size property on appsrc should contain the total stream
|
|
* size in bytes. Setting this property is mandatory in the random-access mode.
|
|
* For the stream and seekable modes, setting this property is optional but
|
|
* recommended.
|
|
*
|
|
* When the application has finished pushing data into appsrc, it should call
|
|
* gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
|
|
* this call, no more buffers can be pushed into appsrc until a flushing seek
|
|
* occurs or the state of the appsrc has gone through READY.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include <gst/gst.h>
|
|
#include <gst/base/base.h>
|
|
|
|
#include <string.h>
|
|
|
|
#include "gstappsrc.h"
|
|
|
|
typedef enum
|
|
{
|
|
NOONE_WAITING = 0,
|
|
STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
|
|
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
|
|
} GstAppSrcWaitStatus;
|
|
|
|
typedef struct
|
|
{
|
|
GstAppSrcCallbacks callbacks;
|
|
gpointer user_data;
|
|
GDestroyNotify destroy_notify;
|
|
gint ref_count;
|
|
} Callbacks;
|
|
|
|
static Callbacks *
|
|
callbacks_ref (Callbacks * callbacks)
|
|
{
|
|
g_atomic_int_inc (&callbacks->ref_count);
|
|
|
|
return callbacks;
|
|
}
|
|
|
|
static void
|
|
callbacks_unref (Callbacks * callbacks)
|
|
{
|
|
if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
|
|
return;
|
|
|
|
if (callbacks->destroy_notify)
|
|
callbacks->destroy_notify (callbacks->user_data);
|
|
|
|
g_free (callbacks);
|
|
}
|
|
|
|
|
|
struct _GstAppSrcPrivate
|
|
{
|
|
GCond cond;
|
|
GMutex mutex;
|
|
GstQueueArray *queue;
|
|
GstAppSrcWaitStatus wait_status;
|
|
|
|
GstCaps *last_caps;
|
|
GstCaps *current_caps;
|
|
/* last segment received on the input */
|
|
GstSegment last_segment;
|
|
/* currently configured segment for the output */
|
|
GstSegment current_segment;
|
|
/* queue up a segment event based on last_segment before
|
|
* the next buffer of buffer list */
|
|
gboolean pending_custom_segment;
|
|
/* events that have been delayed until either the caps is configured, ensuring
|
|
that no events are sent before CAPS, or buffers are being pushed. */
|
|
GstQueueArray *delayed_events;
|
|
/* if a buffer has been pushed yet */
|
|
gboolean pushed_buffer;
|
|
|
|
/* the next buffer that will be queued needs a discont flag
|
|
* because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
|
|
gboolean need_discont_upstream;
|
|
/* the next buffer that will be dequeued needs a discont flag
|
|
* because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
|
|
gboolean need_discont_downstream;
|
|
|
|
gint64 size;
|
|
GstClockTime duration;
|
|
GstAppStreamType stream_type;
|
|
guint64 max_bytes, max_buffers, max_time;
|
|
GstFormat format;
|
|
gboolean block;
|
|
gchar *uri;
|
|
|
|
gboolean flushing;
|
|
gboolean started;
|
|
gboolean is_eos;
|
|
guint64 queued_bytes, queued_buffers;
|
|
/* Used to calculate the current time level */
|
|
GstClockTime last_in_running_time, last_out_running_time;
|
|
/* Updated based on the above whenever they change */
|
|
GstClockTime queued_time;
|
|
guint64 offset;
|
|
GstAppStreamType current_type;
|
|
|
|
guint64 min_latency;
|
|
guint64 max_latency;
|
|
/* Tracks whether the latency message was posted at least once */
|
|
gboolean posted_latency_msg;
|
|
|
|
gboolean emit_signals;
|
|
guint min_percent;
|
|
gboolean handle_segment_change;
|
|
|
|
GstAppLeakyType leaky_type;
|
|
|
|
Callbacks *callbacks;
|
|
};
|
|
|
|
GST_DEBUG_CATEGORY_STATIC (app_src_debug);
|
|
#define GST_CAT_DEFAULT app_src_debug
|
|
|
|
enum
|
|
{
|
|
/* signals */
|
|
SIGNAL_NEED_DATA,
|
|
SIGNAL_ENOUGH_DATA,
|
|
SIGNAL_SEEK_DATA,
|
|
|
|
/* actions */
|
|
SIGNAL_PUSH_BUFFER,
|
|
SIGNAL_END_OF_STREAM,
|
|
SIGNAL_PUSH_SAMPLE,
|
|
SIGNAL_PUSH_BUFFER_LIST,
|
|
|
|
LAST_SIGNAL
|
|
};
|
|
|
|
#define DEFAULT_PROP_SIZE -1
|
|
#define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
|
|
#define DEFAULT_PROP_MAX_BYTES 200000
|
|
#define DEFAULT_PROP_MAX_BUFFERS 0
|
|
#define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND)
|
|
#define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
|
|
#define DEFAULT_PROP_BLOCK FALSE
|
|
#define DEFAULT_PROP_IS_LIVE FALSE
|
|
#define DEFAULT_PROP_MIN_LATENCY -1
|
|
#define DEFAULT_PROP_MAX_LATENCY -1
|
|
#define DEFAULT_PROP_EMIT_SIGNALS TRUE
|
|
#define DEFAULT_PROP_MIN_PERCENT 0
|
|
#define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
|
|
#define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
|
|
#define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
|
|
#define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
|
|
#define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
|
|
#define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
|
|
|
|
enum
|
|
{
|
|
PROP_0,
|
|
PROP_CAPS,
|
|
PROP_SIZE,
|
|
PROP_STREAM_TYPE,
|
|
PROP_MAX_BYTES,
|
|
PROP_MAX_BUFFERS,
|
|
PROP_MAX_TIME,
|
|
PROP_FORMAT,
|
|
PROP_BLOCK,
|
|
PROP_IS_LIVE,
|
|
PROP_MIN_LATENCY,
|
|
PROP_MAX_LATENCY,
|
|
PROP_EMIT_SIGNALS,
|
|
PROP_MIN_PERCENT,
|
|
PROP_CURRENT_LEVEL_BYTES,
|
|
PROP_CURRENT_LEVEL_BUFFERS,
|
|
PROP_CURRENT_LEVEL_TIME,
|
|
PROP_DURATION,
|
|
PROP_HANDLE_SEGMENT_CHANGE,
|
|
PROP_LEAKY_TYPE,
|
|
PROP_LAST
|
|
};
|
|
|
|
static GstStaticPadTemplate gst_app_src_template =
|
|
GST_STATIC_PAD_TEMPLATE ("src",
|
|
GST_PAD_SRC,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS_ANY);
|
|
|
|
static void gst_app_src_uri_handler_init (gpointer g_iface,
|
|
gpointer iface_data);
|
|
|
|
static void gst_app_src_dispose (GObject * object);
|
|
static void gst_app_src_finalize (GObject * object);
|
|
|
|
static void gst_app_src_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec);
|
|
static void gst_app_src_get_property (GObject * object, guint prop_id,
|
|
GValue * value, GParamSpec * pspec);
|
|
|
|
static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
|
|
|
|
static void gst_app_src_set_latencies (GstAppSrc * appsrc,
|
|
gboolean do_min, guint64 min, gboolean do_max, guint64 max);
|
|
|
|
static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
|
|
static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
|
|
GstCaps * filter);
|
|
static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
|
|
guint size, GstBuffer ** buf);
|
|
static gboolean gst_app_src_start (GstBaseSrc * bsrc);
|
|
static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
|
|
static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
|
|
static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
|
|
static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
|
|
static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
|
|
static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
|
|
static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
|
|
static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
|
|
|
|
static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
|
|
GstBuffer * buffer);
|
|
static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
|
|
GstBufferList * buffer_list);
|
|
static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
|
|
GstSample * sample);
|
|
|
|
static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
|
|
|
|
#define gst_app_src_parent_class parent_class
|
|
G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
|
|
G_ADD_PRIVATE (GstAppSrc)
|
|
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
|
|
|
|
static void
|
|
gst_app_src_class_init (GstAppSrcClass * klass)
|
|
{
|
|
GObjectClass *gobject_class = (GObjectClass *) klass;
|
|
GstElementClass *element_class = (GstElementClass *) klass;
|
|
GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
|
|
|
|
GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
|
|
|
|
gobject_class->dispose = gst_app_src_dispose;
|
|
gobject_class->finalize = gst_app_src_finalize;
|
|
|
|
gobject_class->set_property = gst_app_src_set_property;
|
|
gobject_class->get_property = gst_app_src_get_property;
|
|
|
|
/**
|
|
* GstAppSrc:caps:
|
|
*
|
|
* The GstCaps that will negotiated downstream and will be put
|
|
* on outgoing buffers.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_CAPS,
|
|
g_param_spec_boxed ("caps", "Caps",
|
|
"The allowed caps for the src pad", GST_TYPE_CAPS,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
/**
|
|
* GstAppSrc:format:
|
|
*
|
|
* The format to use for segment events. When the source is producing
|
|
* timestamped buffers this property should be set to GST_FORMAT_TIME.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_FORMAT,
|
|
g_param_spec_enum ("format", "Format",
|
|
"The format of the segment events and seek", GST_TYPE_FORMAT,
|
|
DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
/**
|
|
* GstAppSrc:size:
|
|
*
|
|
* The total size in bytes of the data stream. If the total size is known, it
|
|
* is recommended to configure it with this property.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_SIZE,
|
|
g_param_spec_int64 ("size", "Size",
|
|
"The size of the data stream in bytes (-1 if unknown)",
|
|
-1, G_MAXINT64, DEFAULT_PROP_SIZE,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
/**
|
|
* GstAppSrc:stream-type:
|
|
*
|
|
* The type of stream that this source is producing. For seekable streams the
|
|
* application should connect to the seek-data signal.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
|
|
g_param_spec_enum ("stream-type", "Stream Type",
|
|
"the type of the stream", GST_TYPE_APP_STREAM_TYPE,
|
|
DEFAULT_PROP_STREAM_TYPE,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
/**
|
|
* GstAppSrc:max-bytes:
|
|
*
|
|
* The maximum amount of bytes that can be queued internally.
|
|
* After the maximum amount of bytes are queued, appsrc will emit the
|
|
* "enough-data" signal.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
|
|
g_param_spec_uint64 ("max-bytes", "Max bytes",
|
|
"The maximum number of bytes to queue internally (0 = unlimited)",
|
|
0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:max-buffers:
|
|
*
|
|
* The maximum amount of buffers that can be queued internally.
|
|
* After the maximum amount of buffers are queued, appsrc will emit the
|
|
* "enough-data" signal.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
|
|
g_param_spec_uint64 ("max-buffers", "Max buffers",
|
|
"The maximum number of buffers to queue internally (0 = unlimited)",
|
|
0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:max-time:
|
|
*
|
|
* The maximum amount of time that can be queued internally.
|
|
* After the maximum amount of time are queued, appsrc will emit the
|
|
* "enough-data" signal.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_MAX_TIME,
|
|
g_param_spec_uint64 ("max-time", "Max time",
|
|
"The maximum amount of time to queue internally (0 = unlimited)",
|
|
0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:block:
|
|
*
|
|
* When max-bytes are queued and after the enough-data signal has been emitted,
|
|
* block any further push-buffer calls until the amount of queued bytes drops
|
|
* below the max-bytes limit.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_BLOCK,
|
|
g_param_spec_boolean ("block", "Block",
|
|
"Block push-buffer when max-bytes are queued",
|
|
DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:is-live:
|
|
*
|
|
* Instruct the source to behave like a live source. This includes that it
|
|
* will only push out buffers in the PLAYING state.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_IS_LIVE,
|
|
g_param_spec_boolean ("is-live", "Is Live",
|
|
"Whether to act as a live source",
|
|
DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
/**
|
|
* GstAppSrc:min-latency:
|
|
*
|
|
* The minimum latency of the source. A value of -1 will use the default
|
|
* latency calculations of #GstBaseSrc.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
|
|
g_param_spec_int64 ("min-latency", "Min Latency",
|
|
"The minimum latency (-1 = default)",
|
|
-1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
/**
|
|
* GstAppSrc::max-latency:
|
|
*
|
|
* The maximum latency of the source. A value of -1 means an unlimited amount
|
|
* of latency.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
|
|
g_param_spec_int64 ("max-latency", "Max Latency",
|
|
"The maximum latency (-1 = unlimited)",
|
|
-1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:emit-signals:
|
|
*
|
|
* Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
|
|
* This option is by default enabled for backwards compatibility reasons but
|
|
* can disabled when needed because signal emission is expensive.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
|
|
g_param_spec_boolean ("emit-signals", "Emit signals",
|
|
"Emit need-data, enough-data and seek-data signals",
|
|
DEFAULT_PROP_EMIT_SIGNALS,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:min-percent:
|
|
*
|
|
* Make appsrc emit the "need-data" signal when the amount of bytes in the
|
|
* queue drops below this percentage of max-bytes.
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
|
|
g_param_spec_uint ("min-percent", "Min Percent",
|
|
"Emit need-data when queued bytes drops below this percent of max-bytes",
|
|
0, 100, DEFAULT_PROP_MIN_PERCENT,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:current-level-bytes:
|
|
*
|
|
* The number of currently queued bytes inside appsrc.
|
|
*
|
|
* Since: 1.2
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
|
|
g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
|
|
"The number of currently queued bytes",
|
|
0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
|
|
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:current-level-buffers:
|
|
*
|
|
* The number of currently queued buffers inside appsrc.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
|
|
g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
|
|
"The number of currently queued buffers",
|
|
0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
|
|
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:current-level-time:
|
|
*
|
|
* The amount of currently queued time inside appsrc.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
|
|
g_param_spec_uint64 ("current-level-time", "Current Level Time",
|
|
"The amount of currently queued time",
|
|
0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
|
|
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:duration:
|
|
*
|
|
* The total duration in nanoseconds of the data stream. If the total duration is known, it
|
|
* is recommended to configure it with this property.
|
|
*
|
|
* Since: 1.10
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_DURATION,
|
|
g_param_spec_uint64 ("duration", "Duration",
|
|
"The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
|
|
0, G_MAXUINT64, DEFAULT_PROP_DURATION,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:handle-segment-change:
|
|
*
|
|
* When enabled, appsrc will check GstSegment in GstSample which was
|
|
* pushed via gst_app_src_push_sample() or "push-sample" signal action.
|
|
* If a GstSegment is changed, corresponding segment event will be followed
|
|
* by next data flow.
|
|
*
|
|
* FIXME: currently only GST_FORMAT_TIME format is supported and therefore
|
|
* GstAppSrc::format should be time. However, possibly #GstAppSrc can support
|
|
* other formats.
|
|
*
|
|
* Since: 1.18
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
|
|
g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
|
|
"Whether to detect and handle changed time format GstSegment in "
|
|
"GstSample. User should set valid GstSegment in GstSample. "
|
|
"Must set format property as \"time\" to enable this property",
|
|
DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
|
|
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
|
|
G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc:leaky-type:
|
|
*
|
|
* When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
|
|
* will drop any buffers that are pushed into it once its internal queue is
|
|
* full. The selected type defines whether to drop the oldest or new
|
|
* buffers.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
|
|
g_param_spec_enum ("leaky-type", "Leaky Type",
|
|
"Whether to drop buffers once the internal queue is full",
|
|
GST_TYPE_APP_LEAKY_TYPE,
|
|
DEFAULT_PROP_LEAKY_TYPE,
|
|
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
|
|
G_PARAM_STATIC_STRINGS));
|
|
|
|
/**
|
|
* GstAppSrc::need-data:
|
|
* @appsrc: the appsrc element that emitted the signal
|
|
* @length: the amount of bytes needed.
|
|
*
|
|
* Signal that the source needs more data. In the callback or from another
|
|
* thread you should call push-buffer or end-of-stream.
|
|
*
|
|
* @length is just a hint and when it is set to -1, any number of bytes can be
|
|
* pushed into @appsrc.
|
|
*
|
|
* You can call push-buffer multiple times until the enough-data signal is
|
|
* fired.
|
|
*/
|
|
gst_app_src_signals[SIGNAL_NEED_DATA] =
|
|
g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
|
|
G_STRUCT_OFFSET (GstAppSrcClass, need_data),
|
|
NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
|
|
|
|
/**
|
|
* GstAppSrc::enough-data:
|
|
* @appsrc: the appsrc element that emitted the signal
|
|
*
|
|
* Signal that the source has enough data. It is recommended that the
|
|
* application stops calling push-buffer until the need-data signal is
|
|
* emitted again to avoid excessive buffer queueing.
|
|
*/
|
|
gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
|
|
g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
|
|
G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
|
|
NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
|
|
|
|
/**
|
|
* GstAppSrc::seek-data:
|
|
* @appsrc: the appsrc element that emitted the signal
|
|
* @offset: the offset to seek to
|
|
*
|
|
* Seek to the given offset. The next push-buffer should produce buffers from
|
|
* the new @offset.
|
|
* This callback is only called for seekable stream types.
|
|
*
|
|
* Returns: %TRUE if the seek succeeded.
|
|
*/
|
|
gst_app_src_signals[SIGNAL_SEEK_DATA] =
|
|
g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
|
|
G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
|
|
NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
|
|
|
|
/**
|
|
* GstAppSrc::push-buffer:
|
|
* @appsrc: the appsrc
|
|
* @buffer: (transfer none): a buffer to push
|
|
*
|
|
* Adds a buffer to the queue of buffers that the appsrc element will
|
|
* push to its source pad.
|
|
*
|
|
* This function does not take ownership of the buffer, but it takes a
|
|
* reference so the buffer can be unreffed at any time after calling this
|
|
* function.
|
|
*
|
|
* When the block property is TRUE, this function can block until free space
|
|
* becomes available in the queue.
|
|
*/
|
|
gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
|
|
g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
|
|
push_buffer), NULL, NULL, NULL,
|
|
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
|
|
|
|
/**
|
|
* GstAppSrc::push-buffer-list:
|
|
* @appsrc: the appsrc
|
|
* @buffer_list: (transfer none): a buffer list to push
|
|
*
|
|
* Adds a buffer list to the queue of buffers and buffer lists that the
|
|
* appsrc element will push to its source pad.
|
|
*
|
|
* This function does not take ownership of the buffer list, but it takes a
|
|
* reference so the buffer list can be unreffed at any time after calling
|
|
* this function.
|
|
*
|
|
* When the block property is TRUE, this function can block until free space
|
|
* becomes available in the queue.
|
|
*
|
|
* Since: 1.14
|
|
*/
|
|
gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
|
|
g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
|
|
push_buffer_list), NULL, NULL, NULL,
|
|
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
|
|
|
|
/**
|
|
* GstAppSrc::push-sample:
|
|
* @appsrc: the appsrc
|
|
* @sample: (transfer none): a sample from which extract buffer to push
|
|
*
|
|
* Extract a buffer from the provided sample and adds the extracted buffer
|
|
* to the queue of buffers that the appsrc element will
|
|
* push to its source pad. This function set the appsrc caps based on the caps
|
|
* in the sample and reset the caps if they change.
|
|
* Only the caps and the buffer of the provided sample are used and not
|
|
* for example the segment in the sample.
|
|
*
|
|
* This function does not take ownership of the sample, but it takes a
|
|
* reference so the sample can be unreffed at any time after calling this
|
|
* function.
|
|
*
|
|
* When the block property is TRUE, this function can block until free space
|
|
* becomes available in the queue.
|
|
*
|
|
* Since: 1.6
|
|
*/
|
|
gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
|
|
g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
|
|
push_sample), NULL, NULL, NULL,
|
|
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
|
|
|
|
|
|
/**
|
|
* GstAppSrc::end-of-stream:
|
|
* @appsrc: the appsrc
|
|
*
|
|
* Notify @appsrc that no more buffer are available.
|
|
*/
|
|
gst_app_src_signals[SIGNAL_END_OF_STREAM] =
|
|
g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
|
|
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
|
|
end_of_stream), NULL, NULL, NULL,
|
|
GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
|
|
|
|
gst_element_class_set_static_metadata (element_class, "AppSrc",
|
|
"Generic/Source", "Allow the application to feed buffers to a pipeline",
|
|
"David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
|
|
|
|
gst_element_class_add_static_pad_template (element_class,
|
|
&gst_app_src_template);
|
|
|
|
element_class->send_event = gst_app_src_send_event;
|
|
|
|
basesrc_class->negotiate = gst_app_src_negotiate;
|
|
basesrc_class->get_caps = gst_app_src_internal_get_caps;
|
|
basesrc_class->create = gst_app_src_create;
|
|
basesrc_class->start = gst_app_src_start;
|
|
basesrc_class->stop = gst_app_src_stop;
|
|
basesrc_class->unlock = gst_app_src_unlock;
|
|
basesrc_class->unlock_stop = gst_app_src_unlock_stop;
|
|
basesrc_class->do_seek = gst_app_src_do_seek;
|
|
basesrc_class->is_seekable = gst_app_src_is_seekable;
|
|
basesrc_class->get_size = gst_app_src_do_get_size;
|
|
basesrc_class->query = gst_app_src_query;
|
|
basesrc_class->event = gst_app_src_event;
|
|
|
|
klass->push_buffer = gst_app_src_push_buffer_action;
|
|
klass->push_buffer_list = gst_app_src_push_buffer_list_action;
|
|
klass->push_sample = gst_app_src_push_sample_action;
|
|
klass->end_of_stream = gst_app_src_end_of_stream;
|
|
}
|
|
|
|
static void
|
|
gst_app_src_init (GstAppSrc * appsrc)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
|
|
|
|
g_mutex_init (&priv->mutex);
|
|
g_cond_init (&priv->cond);
|
|
priv->queue = gst_queue_array_new (16);
|
|
priv->delayed_events = gst_queue_array_new (16);
|
|
priv->wait_status = NOONE_WAITING;
|
|
priv->pushed_buffer = FALSE;
|
|
|
|
priv->size = DEFAULT_PROP_SIZE;
|
|
priv->duration = DEFAULT_PROP_DURATION;
|
|
priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
|
|
priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
|
|
priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
|
|
priv->max_time = DEFAULT_PROP_MAX_TIME;
|
|
priv->format = DEFAULT_PROP_FORMAT;
|
|
priv->block = DEFAULT_PROP_BLOCK;
|
|
priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
|
|
priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
|
|
priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
|
|
priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
|
|
priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
|
|
priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
|
|
|
|
gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
|
|
}
|
|
|
|
/* Must be called with priv->mutex */
|
|
static void
|
|
gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
|
|
{
|
|
GstMiniObject *obj;
|
|
GstAppSrcPrivate *priv = src->priv;
|
|
GstCaps *requeue_caps = NULL;
|
|
|
|
while (!gst_queue_array_is_empty (priv->queue)) {
|
|
obj = gst_queue_array_pop_head (priv->queue);
|
|
if (obj) {
|
|
if (GST_IS_CAPS (obj) && retain_last_caps) {
|
|
gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
|
|
}
|
|
gst_mini_object_unref (obj);
|
|
}
|
|
}
|
|
|
|
if (requeue_caps) {
|
|
gst_queue_array_push_tail (priv->queue, requeue_caps);
|
|
}
|
|
|
|
gst_queue_array_clear (priv->delayed_events);
|
|
priv->pushed_buffer = FALSE;
|
|
|
|
priv->queued_bytes = 0;
|
|
priv->queued_buffers = 0;
|
|
priv->queued_time = 0;
|
|
priv->last_in_running_time = GST_CLOCK_TIME_NONE;
|
|
priv->last_out_running_time = GST_CLOCK_TIME_NONE;
|
|
priv->need_discont_upstream = FALSE;
|
|
priv->need_discont_downstream = FALSE;
|
|
}
|
|
|
|
static void
|
|
gst_app_src_dispose (GObject * obj)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
Callbacks *callbacks = NULL;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
if (priv->current_caps) {
|
|
gst_caps_unref (priv->current_caps);
|
|
priv->current_caps = NULL;
|
|
}
|
|
if (priv->last_caps) {
|
|
gst_caps_unref (priv->last_caps);
|
|
priv->last_caps = NULL;
|
|
}
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (priv->callbacks)
|
|
callbacks = g_steal_pointer (&priv->callbacks);
|
|
gst_app_src_flush_queued (appsrc, FALSE);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
g_clear_pointer (&callbacks, callbacks_unref);
|
|
|
|
G_OBJECT_CLASS (parent_class)->dispose (obj);
|
|
}
|
|
|
|
static void
|
|
gst_app_src_finalize (GObject * obj)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
g_mutex_clear (&priv->mutex);
|
|
g_cond_clear (&priv->cond);
|
|
gst_queue_array_free (priv->queue);
|
|
gst_queue_array_free (priv->delayed_events);
|
|
|
|
g_free (priv->uri);
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (obj);
|
|
}
|
|
|
|
static GstCaps *
|
|
gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC (bsrc);
|
|
GstCaps *caps;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
if ((caps = appsrc->priv->current_caps))
|
|
gst_caps_ref (caps);
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
if (filter) {
|
|
if (caps) {
|
|
GstCaps *intersection =
|
|
gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
|
|
gst_caps_unref (caps);
|
|
caps = intersection;
|
|
} else {
|
|
caps = gst_caps_ref (filter);
|
|
}
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
|
|
return caps;
|
|
}
|
|
|
|
static void
|
|
gst_app_src_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
switch (prop_id) {
|
|
case PROP_CAPS:
|
|
gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
|
|
break;
|
|
case PROP_SIZE:
|
|
gst_app_src_set_size (appsrc, g_value_get_int64 (value));
|
|
break;
|
|
case PROP_STREAM_TYPE:
|
|
gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
|
|
break;
|
|
case PROP_MAX_BYTES:
|
|
gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
|
|
break;
|
|
case PROP_MAX_BUFFERS:
|
|
gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
|
|
break;
|
|
case PROP_MAX_TIME:
|
|
gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
|
|
break;
|
|
case PROP_FORMAT:
|
|
priv->format = g_value_get_enum (value);
|
|
break;
|
|
case PROP_BLOCK:
|
|
priv->block = g_value_get_boolean (value);
|
|
break;
|
|
case PROP_IS_LIVE:
|
|
gst_base_src_set_live (GST_BASE_SRC (appsrc),
|
|
g_value_get_boolean (value));
|
|
break;
|
|
case PROP_MIN_LATENCY:
|
|
gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
|
|
FALSE, -1);
|
|
break;
|
|
case PROP_MAX_LATENCY:
|
|
gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
|
|
g_value_get_int64 (value));
|
|
break;
|
|
case PROP_EMIT_SIGNALS:
|
|
gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
|
|
break;
|
|
case PROP_MIN_PERCENT:
|
|
priv->min_percent = g_value_get_uint (value);
|
|
break;
|
|
case PROP_DURATION:
|
|
gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
|
|
break;
|
|
case PROP_HANDLE_SEGMENT_CHANGE:
|
|
priv->handle_segment_change = g_value_get_boolean (value);
|
|
break;
|
|
case PROP_LEAKY_TYPE:
|
|
priv->leaky_type = g_value_get_enum (value);
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
|
|
GParamSpec * pspec)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
switch (prop_id) {
|
|
case PROP_CAPS:
|
|
g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
|
|
break;
|
|
case PROP_SIZE:
|
|
g_value_set_int64 (value, gst_app_src_get_size (appsrc));
|
|
break;
|
|
case PROP_STREAM_TYPE:
|
|
g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
|
|
break;
|
|
case PROP_MAX_BYTES:
|
|
g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
|
|
break;
|
|
case PROP_MAX_BUFFERS:
|
|
g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
|
|
break;
|
|
case PROP_MAX_TIME:
|
|
g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
|
|
break;
|
|
case PROP_FORMAT:
|
|
g_value_set_enum (value, priv->format);
|
|
break;
|
|
case PROP_BLOCK:
|
|
g_value_set_boolean (value, priv->block);
|
|
break;
|
|
case PROP_IS_LIVE:
|
|
g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
|
|
break;
|
|
case PROP_MIN_LATENCY:
|
|
{
|
|
guint64 min = 0;
|
|
|
|
gst_app_src_get_latency (appsrc, &min, NULL);
|
|
g_value_set_int64 (value, min);
|
|
break;
|
|
}
|
|
case PROP_MAX_LATENCY:
|
|
{
|
|
guint64 max = 0;
|
|
|
|
gst_app_src_get_latency (appsrc, NULL, &max);
|
|
g_value_set_int64 (value, max);
|
|
break;
|
|
}
|
|
case PROP_EMIT_SIGNALS:
|
|
g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
|
|
break;
|
|
case PROP_MIN_PERCENT:
|
|
g_value_set_uint (value, priv->min_percent);
|
|
break;
|
|
case PROP_CURRENT_LEVEL_BYTES:
|
|
g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
|
|
break;
|
|
case PROP_CURRENT_LEVEL_BUFFERS:
|
|
g_value_set_uint64 (value,
|
|
gst_app_src_get_current_level_buffers (appsrc));
|
|
break;
|
|
case PROP_CURRENT_LEVEL_TIME:
|
|
g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
|
|
break;
|
|
case PROP_DURATION:
|
|
g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
|
|
break;
|
|
case PROP_HANDLE_SEGMENT_CHANGE:
|
|
g_value_set_boolean (value, priv->handle_segment_change);
|
|
break;
|
|
case PROP_LEAKY_TYPE:
|
|
g_value_set_enum (value, priv->leaky_type);
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_send_event (GstElement * element, GstEvent * event)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_FLUSH_STOP:
|
|
g_mutex_lock (&priv->mutex);
|
|
gst_app_src_flush_queued (appsrc, TRUE);
|
|
g_mutex_unlock (&priv->mutex);
|
|
break;
|
|
default:
|
|
if (GST_EVENT_IS_SERIALIZED (event)) {
|
|
GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
|
|
g_mutex_lock (&priv->mutex);
|
|
|
|
gst_queue_array_push_tail (priv->queue, event);
|
|
|
|
if ((priv->wait_status & STREAM_WAITING))
|
|
g_cond_broadcast (&priv->cond);
|
|
|
|
g_mutex_unlock (&priv->mutex);
|
|
return TRUE;
|
|
}
|
|
break;
|
|
}
|
|
|
|
return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
|
|
event), FALSE);
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_unlock (GstBaseSrc * bsrc)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
GST_DEBUG_OBJECT (appsrc, "unlock start");
|
|
priv->flushing = TRUE;
|
|
g_cond_broadcast (&priv->cond);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_unlock_stop (GstBaseSrc * bsrc)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
GST_DEBUG_OBJECT (appsrc, "unlock stop");
|
|
priv->flushing = FALSE;
|
|
g_cond_broadcast (&priv->cond);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_start (GstBaseSrc * bsrc)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
GST_DEBUG_OBJECT (appsrc, "starting");
|
|
priv->started = TRUE;
|
|
/* set the offset to -1 so that we always do a first seek. This is only used
|
|
* in random-access mode. */
|
|
priv->offset = -1;
|
|
priv->flushing = FALSE;
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
gst_base_src_set_format (bsrc, priv->format);
|
|
gst_segment_init (&priv->last_segment, priv->format);
|
|
gst_segment_init (&priv->current_segment, priv->format);
|
|
priv->pending_custom_segment = FALSE;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_stop (GstBaseSrc * bsrc)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
GST_DEBUG_OBJECT (appsrc, "stopping");
|
|
priv->is_eos = FALSE;
|
|
priv->flushing = TRUE;
|
|
priv->started = FALSE;
|
|
priv->posted_latency_msg = FALSE;
|
|
gst_app_src_flush_queued (appsrc, TRUE);
|
|
g_cond_broadcast (&priv->cond);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_is_seekable (GstBaseSrc * src)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
gboolean res = FALSE;
|
|
|
|
switch (priv->stream_type) {
|
|
case GST_APP_STREAM_TYPE_STREAM:
|
|
break;
|
|
case GST_APP_STREAM_TYPE_SEEKABLE:
|
|
case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
|
|
res = TRUE;
|
|
break;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
|
|
|
|
*size = gst_app_src_get_size (appsrc);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_query (GstBaseSrc * src, GstQuery * query)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
gboolean res;
|
|
|
|
switch (GST_QUERY_TYPE (query)) {
|
|
case GST_QUERY_LATENCY:
|
|
{
|
|
GstClockTime min, max;
|
|
gboolean live;
|
|
|
|
/* Query the parent class for the defaults */
|
|
res = gst_base_src_query_latency (src, &live, &min, &max);
|
|
|
|
/* overwrite with our values when we need to */
|
|
g_mutex_lock (&priv->mutex);
|
|
if (priv->min_latency != -1) {
|
|
min = priv->min_latency;
|
|
max = priv->max_latency;
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
gst_query_set_latency (query, live, min, max);
|
|
break;
|
|
}
|
|
case GST_QUERY_SCHEDULING:
|
|
{
|
|
gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
|
|
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
|
|
|
|
switch (priv->stream_type) {
|
|
case GST_APP_STREAM_TYPE_STREAM:
|
|
case GST_APP_STREAM_TYPE_SEEKABLE:
|
|
break;
|
|
case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
|
|
gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
|
|
break;
|
|
}
|
|
res = TRUE;
|
|
break;
|
|
}
|
|
case GST_QUERY_DURATION:
|
|
{
|
|
GstFormat format;
|
|
gst_query_parse_duration (query, &format, NULL);
|
|
if (format == GST_FORMAT_BYTES) {
|
|
gst_query_set_duration (query, format, priv->size);
|
|
res = TRUE;
|
|
} else if (format == GST_FORMAT_TIME) {
|
|
if (priv->duration != GST_CLOCK_TIME_NONE) {
|
|
gst_query_set_duration (query, format, priv->duration);
|
|
res = TRUE;
|
|
} else {
|
|
res = FALSE;
|
|
}
|
|
} else {
|
|
res = FALSE;
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
|
|
break;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
/* will be called in push mode */
|
|
static gboolean
|
|
gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
gint64 desired_position;
|
|
gboolean res = FALSE;
|
|
gboolean emit;
|
|
Callbacks *callbacks = NULL;
|
|
|
|
desired_position = segment->position;
|
|
|
|
/* no need to try to seek in streaming mode */
|
|
if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
|
|
return TRUE;
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
|
|
desired_position, gst_format_get_name (segment->format));
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
emit = priv->emit_signals;
|
|
if (priv->callbacks)
|
|
callbacks = callbacks_ref (priv->callbacks);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
if (callbacks && callbacks->callbacks.seek_data) {
|
|
res =
|
|
callbacks->callbacks.seek_data (appsrc, desired_position,
|
|
callbacks->user_data);
|
|
} else if (emit) {
|
|
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
|
|
desired_position, &res);
|
|
}
|
|
|
|
g_clear_pointer (&callbacks, callbacks_unref);
|
|
|
|
if (res) {
|
|
GST_DEBUG_OBJECT (appsrc, "flushing queue");
|
|
g_mutex_lock (&priv->mutex);
|
|
gst_app_src_flush_queued (appsrc, TRUE);
|
|
gst_segment_copy_into (segment, &priv->last_segment);
|
|
gst_segment_copy_into (segment, &priv->current_segment);
|
|
priv->pending_custom_segment = FALSE;
|
|
g_mutex_unlock (&priv->mutex);
|
|
priv->is_eos = FALSE;
|
|
} else {
|
|
GST_WARNING_OBJECT (appsrc, "seek failed");
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
/* must be called with the appsrc mutex */
|
|
static gboolean
|
|
gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
|
|
{
|
|
gboolean res = FALSE;
|
|
gboolean emit;
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
Callbacks *callbacks = NULL;
|
|
|
|
emit = priv->emit_signals;
|
|
if (priv->callbacks)
|
|
callbacks = callbacks_ref (priv->callbacks);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
|
|
priv->offset, offset);
|
|
|
|
if (callbacks && callbacks->callbacks.seek_data)
|
|
res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
|
|
else if (emit)
|
|
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
|
|
offset, &res);
|
|
|
|
g_clear_pointer (&callbacks, callbacks_unref);
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
|
|
return res;
|
|
}
|
|
|
|
/* must be called with the appsrc mutex. After this call things can be
|
|
* flushing */
|
|
static void
|
|
gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
|
|
{
|
|
gboolean emit;
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
Callbacks *callbacks = NULL;
|
|
|
|
emit = priv->emit_signals;
|
|
if (priv->callbacks)
|
|
callbacks = callbacks_ref (priv->callbacks);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
/* we have no data, we need some. We fire the signal with the size hint. */
|
|
if (callbacks && callbacks->callbacks.need_data)
|
|
callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
|
|
else if (emit)
|
|
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
|
|
NULL);
|
|
|
|
g_clear_pointer (&callbacks, callbacks_unref);
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
/* we can be flushing now because we released the lock */
|
|
}
|
|
|
|
/* must be called with the appsrc mutex */
|
|
static gboolean
|
|
gst_app_src_do_negotiate (GstBaseSrc * basesrc)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
gboolean result = TRUE;
|
|
GstCaps *caps;
|
|
|
|
GST_OBJECT_LOCK (basesrc);
|
|
caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
|
|
GST_OBJECT_UNLOCK (basesrc);
|
|
|
|
/* Avoid deadlock by unlocking mutex
|
|
* otherwise we get deadlock between this and stream lock */
|
|
g_mutex_unlock (&priv->mutex);
|
|
if (caps) {
|
|
result = gst_base_src_set_caps (basesrc, caps);
|
|
gst_caps_unref (caps);
|
|
}
|
|
g_mutex_lock (&priv->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_negotiate (GstBaseSrc * basesrc)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
gboolean result;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
result = gst_app_src_do_negotiate (basesrc);
|
|
g_mutex_unlock (&priv->mutex);
|
|
return result;
|
|
}
|
|
|
|
/* Update the currently queued bytes/buffers/time information for the item
|
|
* that was just removed from the queue.
|
|
*
|
|
* If update_offset is set, additionally the offset of the source will be
|
|
* moved forward accordingly as if that many bytes were output.
|
|
*/
|
|
static void
|
|
gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
|
|
gboolean update_offset)
|
|
{
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
guint buf_size = 0;
|
|
guint n_buffers = 0;
|
|
GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
|
|
|
|
if (GST_IS_BUFFER (item)) {
|
|
GstBuffer *buf = GST_BUFFER_CAST (item);
|
|
buf_size = gst_buffer_get_size (buf);
|
|
n_buffers = 1;
|
|
|
|
end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
|
|
if (end_buffer_ts != GST_CLOCK_TIME_NONE
|
|
&& GST_BUFFER_DURATION_IS_VALID (buf))
|
|
end_buffer_ts += GST_BUFFER_DURATION (buf);
|
|
|
|
GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
|
|
} else if (GST_IS_BUFFER_LIST (item)) {
|
|
GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
|
|
guint i;
|
|
|
|
n_buffers = gst_buffer_list_length (buffer_list);
|
|
|
|
for (i = 0; i < n_buffers; i++) {
|
|
GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
|
|
GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
|
|
|
|
buf_size += gst_buffer_get_size (tmp);
|
|
/* Update to the last buffer's timestamp that is known */
|
|
if (ts != GST_CLOCK_TIME_NONE) {
|
|
end_buffer_ts = ts;
|
|
if (GST_BUFFER_DURATION_IS_VALID (tmp))
|
|
end_buffer_ts += GST_BUFFER_DURATION (tmp);
|
|
}
|
|
}
|
|
}
|
|
|
|
priv->queued_bytes -= buf_size;
|
|
priv->queued_buffers -= n_buffers;
|
|
|
|
/* Update time level if working on a TIME segment */
|
|
if ((priv->current_segment.format == GST_FORMAT_TIME
|
|
|| (priv->current_segment.format == GST_FORMAT_UNDEFINED
|
|
&& priv->last_segment.format == GST_FORMAT_TIME))
|
|
&& end_buffer_ts != GST_CLOCK_TIME_NONE) {
|
|
const GstSegment *segment =
|
|
priv->current_segment.format ==
|
|
GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
|
|
|
|
/* Clip to the current segment boundaries */
|
|
if (segment->stop != -1 && end_buffer_ts > segment->stop)
|
|
end_buffer_ts = segment->stop;
|
|
else if (segment->start > end_buffer_ts)
|
|
end_buffer_ts = segment->start;
|
|
|
|
priv->last_out_running_time =
|
|
gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
|
|
|
|
GST_TRACE_OBJECT (appsrc,
|
|
"Last in running time %" GST_TIME_FORMAT ", last out running time %"
|
|
GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
|
|
GST_TIME_ARGS (priv->last_out_running_time));
|
|
|
|
/* If timestamps on both sides are known, calculate the current
|
|
* fill level in time and consider the queue empty if the output
|
|
* running time is lower than the input one (i.e. some kind of reset
|
|
* has happened).
|
|
*/
|
|
if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
|
|
&& priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
|
|
if (priv->last_out_running_time > priv->last_in_running_time) {
|
|
priv->queued_time = 0;
|
|
} else {
|
|
priv->queued_time =
|
|
priv->last_in_running_time - priv->last_out_running_time;
|
|
}
|
|
}
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
|
|
" buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
|
|
priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
|
|
|
|
/* only update the offset when in random_access mode and when requested by
|
|
* the caller, i.e. not when just dropping the item */
|
|
if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
|
|
priv->offset += buf_size;
|
|
}
|
|
|
|
/* Update the currently queued bytes/buffers/time information for the item
|
|
* that was just added to the queue.
|
|
*/
|
|
static void
|
|
gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
|
|
{
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
|
|
GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
|
|
guint buf_size = 0;
|
|
guint n_buffers = 0;
|
|
|
|
if (GST_IS_BUFFER (item)) {
|
|
GstBuffer *buf = GST_BUFFER_CAST (item);
|
|
|
|
buf_size = gst_buffer_get_size (buf);
|
|
n_buffers = 1;
|
|
|
|
start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
|
|
if (end_buffer_ts != GST_CLOCK_TIME_NONE
|
|
&& GST_BUFFER_DURATION_IS_VALID (buf))
|
|
end_buffer_ts += GST_BUFFER_DURATION (buf);
|
|
} else if (GST_IS_BUFFER_LIST (item)) {
|
|
GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
|
|
guint i;
|
|
|
|
n_buffers = gst_buffer_list_length (buffer_list);
|
|
|
|
for (i = 0; i < n_buffers; i++) {
|
|
GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
|
|
GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
|
|
|
|
buf_size += gst_buffer_get_size (tmp);
|
|
|
|
if (ts != GST_CLOCK_TIME_NONE) {
|
|
if (start_buffer_ts == GST_CLOCK_TIME_NONE)
|
|
start_buffer_ts = ts;
|
|
end_buffer_ts = ts;
|
|
if (GST_BUFFER_DURATION_IS_VALID (tmp))
|
|
end_buffer_ts += GST_BUFFER_DURATION (tmp);
|
|
}
|
|
}
|
|
}
|
|
|
|
priv->queued_bytes += buf_size;
|
|
priv->queued_buffers += n_buffers;
|
|
|
|
/* Update time level if working on a TIME segment */
|
|
if (priv->last_segment.format == GST_FORMAT_TIME
|
|
&& end_buffer_ts != GST_CLOCK_TIME_NONE) {
|
|
/* Clip to the last segment boundaries */
|
|
if (priv->last_segment.stop != -1
|
|
&& end_buffer_ts > priv->last_segment.stop)
|
|
end_buffer_ts = priv->last_segment.stop;
|
|
else if (priv->last_segment.start > end_buffer_ts)
|
|
end_buffer_ts = priv->last_segment.start;
|
|
|
|
priv->last_in_running_time =
|
|
gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
|
|
end_buffer_ts);
|
|
|
|
/* If this is the only buffer then we can directly update the queued time
|
|
* here. This is especially useful if this was the first buffer because
|
|
* otherwise we would have to wait until it is actually unqueued to know
|
|
* the queued duration */
|
|
if (priv->queued_buffers == 1) {
|
|
if (priv->last_segment.stop != -1
|
|
&& start_buffer_ts > priv->last_segment.stop)
|
|
start_buffer_ts = priv->last_segment.stop;
|
|
else if (priv->last_segment.start > start_buffer_ts)
|
|
start_buffer_ts = priv->last_segment.start;
|
|
|
|
priv->last_out_running_time =
|
|
gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
|
|
start_buffer_ts);
|
|
}
|
|
|
|
GST_TRACE_OBJECT (appsrc,
|
|
"Last in running time %" GST_TIME_FORMAT ", last out running time %"
|
|
GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
|
|
GST_TIME_ARGS (priv->last_out_running_time));
|
|
|
|
if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
|
|
&& priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
|
|
if (priv->last_out_running_time > priv->last_in_running_time) {
|
|
priv->queued_time = 0;
|
|
} else {
|
|
priv->queued_time =
|
|
priv->last_in_running_time - priv->last_out_running_time;
|
|
}
|
|
}
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
|
|
" buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
|
|
GST_TIME_ARGS (priv->queued_time));
|
|
}
|
|
|
|
/* check if @obj should be send after the CAPS and SEGMENT events */
|
|
static gboolean
|
|
needs_segment (GstMiniObject * obj)
|
|
{
|
|
if (GST_IS_CAPS (obj)) {
|
|
return FALSE;
|
|
} else if (GST_IS_EVENT (obj)) {
|
|
if (GST_EVENT_TYPE (obj) == GST_EVENT_SEGMENT)
|
|
return FALSE;
|
|
}
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
/* Called holding the priv->lock, and releases it temporarily */
|
|
static void
|
|
ensure_segment (GstAppSrc * appsrc)
|
|
{
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
GstEvent *seg_event;
|
|
|
|
seg_event = gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
|
|
GST_EVENT_SEGMENT, 0);
|
|
|
|
if (!seg_event) {
|
|
g_mutex_unlock (&priv->mutex);
|
|
GST_DEBUG_OBJECT (appsrc, "sending default segment");
|
|
gst_base_src_push_segment (GST_BASE_SRC_CAST (appsrc), &priv->last_segment);
|
|
g_mutex_lock (&priv->mutex);
|
|
} else {
|
|
gst_event_unref (seg_event);
|
|
}
|
|
}
|
|
|
|
static void
|
|
push_delayed_events (GstAppSrc * appsrc)
|
|
{
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
while (!gst_queue_array_is_empty (priv->delayed_events)) {
|
|
GstEvent *event;
|
|
|
|
event = gst_queue_array_pop_head (priv->delayed_events);
|
|
GST_DEBUG_OBJECT (appsrc, "sending event: %" GST_PTR_FORMAT, event);
|
|
|
|
g_mutex_unlock (&priv->mutex);
|
|
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), GST_EVENT (event));
|
|
g_mutex_lock (&priv->mutex);
|
|
}
|
|
}
|
|
|
|
static GstFlowReturn
|
|
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
|
|
GstBuffer ** buf)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
GstFlowReturn ret;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
|
|
bsrc->segment.format == GST_FORMAT_BYTES)) {
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
|
|
bsrc->segment.duration, priv->size);
|
|
bsrc->segment.duration = priv->size;
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
gst_element_post_message (GST_ELEMENT (appsrc),
|
|
gst_message_new_duration_changed (GST_OBJECT (appsrc)));
|
|
} else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
|
|
bsrc->segment.format == GST_FORMAT_TIME)) {
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
|
|
bsrc->segment.duration = priv->duration;
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
gst_element_post_message (GST_ELEMENT (appsrc),
|
|
gst_message_new_duration_changed (GST_OBJECT (appsrc)));
|
|
} else {
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
}
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
/* check flushing first */
|
|
if (G_UNLIKELY (priv->flushing))
|
|
goto flushing;
|
|
|
|
if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
|
|
/* if we are dealing with a random-access stream, issue a seek if the offset
|
|
* changed. */
|
|
if (G_UNLIKELY (priv->offset != offset)) {
|
|
gboolean res;
|
|
|
|
/* do the seek */
|
|
res = gst_app_src_emit_seek (appsrc, offset);
|
|
|
|
if (G_UNLIKELY (!res))
|
|
/* failing to seek is fatal */
|
|
goto seek_error;
|
|
|
|
priv->offset = offset;
|
|
priv->is_eos = FALSE;
|
|
}
|
|
}
|
|
|
|
while (TRUE) {
|
|
/* Our lock may have been release to push events or caps, check out
|
|
* state in case we are now flushing. */
|
|
if (G_UNLIKELY (priv->flushing))
|
|
goto flushing;
|
|
|
|
/* return data as long as we have some */
|
|
if (!gst_queue_array_is_empty (priv->queue)) {
|
|
GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
|
|
|
|
if (priv->current_caps && needs_segment (obj)) {
|
|
/* need to have sent a segment before sending `obj` */
|
|
ensure_segment (appsrc);
|
|
}
|
|
|
|
if (GST_IS_CAPS (obj)) {
|
|
GstCaps *next_caps = GST_CAPS (obj);
|
|
gboolean caps_changed = TRUE;
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "pop caps %" GST_PTR_FORMAT, next_caps);
|
|
|
|
if (next_caps && priv->current_caps)
|
|
caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
|
|
else
|
|
caps_changed = (next_caps != priv->current_caps);
|
|
|
|
gst_caps_replace (&priv->current_caps, next_caps);
|
|
|
|
if (next_caps) {
|
|
gst_caps_unref (next_caps);
|
|
}
|
|
|
|
if (caps_changed)
|
|
gst_app_src_do_negotiate (bsrc);
|
|
|
|
/* sending delayed events which were waiting on the caps */
|
|
if (!gst_queue_array_is_empty (priv->delayed_events)) {
|
|
/* need to send a segment before the events */
|
|
ensure_segment (appsrc);
|
|
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"sending delayed events after caps: %" GST_PTR_FORMAT, obj);
|
|
|
|
push_delayed_events (appsrc);
|
|
}
|
|
|
|
/* Continue checks caps and queue */
|
|
continue;
|
|
}
|
|
|
|
if (GST_IS_BUFFER (obj)) {
|
|
GstBuffer *buffer = GST_BUFFER (obj);
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "pop buffer %" GST_PTR_FORMAT, buffer);
|
|
|
|
/* Mark the buffer as DISCONT if we previously dropped a buffer
|
|
* instead of outputting it */
|
|
if (priv->need_discont_downstream) {
|
|
buffer = gst_buffer_make_writable (buffer);
|
|
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
|
|
priv->need_discont_downstream = FALSE;
|
|
}
|
|
|
|
if (!gst_queue_array_is_empty (priv->delayed_events)) {
|
|
/* don't keep delaying events if a buffer has been pushed without CAPS */
|
|
GST_DEBUG_OBJECT (appsrc, "push delayed events before buffer");
|
|
push_delayed_events (appsrc);
|
|
}
|
|
|
|
priv->pushed_buffer = TRUE;
|
|
*buf = buffer;
|
|
} else if (GST_IS_BUFFER_LIST (obj)) {
|
|
GstBufferList *buffer_list;
|
|
|
|
buffer_list = GST_BUFFER_LIST (obj);
|
|
GST_DEBUG_OBJECT (appsrc, "pop buffer list %" GST_PTR_FORMAT,
|
|
buffer_list);
|
|
|
|
/* Mark the first buffer of the buffer list as DISCONT if we
|
|
* previously dropped a buffer instead of outputting it */
|
|
if (priv->need_discont_downstream) {
|
|
GstBuffer *buffer;
|
|
|
|
buffer_list = gst_buffer_list_make_writable (buffer_list);
|
|
buffer = gst_buffer_list_get_writable (buffer_list, 0);
|
|
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
|
|
priv->need_discont_downstream = FALSE;
|
|
}
|
|
|
|
if (!gst_queue_array_is_empty (priv->delayed_events)) {
|
|
/* don't keep delaying events if a buffer has been pushed without CAPS */
|
|
GST_DEBUG_OBJECT (appsrc, "push delayed events before buffer");
|
|
push_delayed_events (appsrc);
|
|
}
|
|
|
|
gst_base_src_submit_buffer_list (bsrc, buffer_list);
|
|
priv->pushed_buffer = TRUE;
|
|
*buf = NULL;
|
|
} else if (GST_IS_EVENT (obj)) {
|
|
GstEvent *event = GST_EVENT (obj);
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
|
|
|
|
if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
|
|
const GstSegment *segment = NULL;
|
|
|
|
gst_event_parse_segment (event, &segment);
|
|
g_assert (segment != NULL);
|
|
|
|
if (!gst_segment_is_equal (&priv->current_segment, segment)) {
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"Update new segment %" GST_PTR_FORMAT, event);
|
|
if (!gst_base_src_push_segment (bsrc, segment)) {
|
|
GST_ERROR_OBJECT (appsrc,
|
|
"Couldn't set new segment %" GST_PTR_FORMAT, event);
|
|
gst_event_unref (event);
|
|
goto invalid_segment;
|
|
}
|
|
gst_segment_copy_into (segment, &priv->current_segment);
|
|
}
|
|
|
|
gst_event_unref (event);
|
|
} else {
|
|
/* event is serialized with the buffers flow */
|
|
|
|
if (!priv->current_caps && !priv->pushed_buffer) {
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"did not send caps yet, delay event for now");
|
|
gst_queue_array_push_tail (priv->delayed_events, event);
|
|
} else {
|
|
/* We are about to push an event, release out lock */
|
|
g_mutex_unlock (&priv->mutex);
|
|
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
|
|
g_mutex_lock (&priv->mutex);
|
|
}
|
|
}
|
|
continue;
|
|
} else {
|
|
g_assert_not_reached ();
|
|
}
|
|
|
|
gst_app_src_update_queued_pop (appsrc, obj, TRUE);
|
|
|
|
/* signal that we removed an item */
|
|
if ((priv->wait_status & APP_WAITING))
|
|
g_cond_broadcast (&priv->cond);
|
|
|
|
/* see if we go lower than the min-percent */
|
|
if (priv->min_percent) {
|
|
if ((priv->max_bytes
|
|
&& priv->queued_bytes * 100 / priv->max_bytes <=
|
|
priv->min_percent) || (priv->max_buffers
|
|
&& priv->queued_buffers * 100 / priv->max_buffers <=
|
|
priv->min_percent) || (priv->max_time
|
|
&& priv->queued_time * 100 / priv->max_time <=
|
|
priv->min_percent)) {
|
|
/* ignore flushing state, we got a buffer and we will return it now.
|
|
* Errors will be handled in the next round */
|
|
gst_app_src_emit_need_data (appsrc, size);
|
|
}
|
|
}
|
|
ret = GST_FLOW_OK;
|
|
break;
|
|
} else {
|
|
gst_app_src_emit_need_data (appsrc, size);
|
|
|
|
/* we can be flushing now because we released the lock above */
|
|
if (G_UNLIKELY (priv->flushing))
|
|
goto flushing;
|
|
|
|
/* if we have a buffer now, continue the loop and try to return it. In
|
|
* random-access mode (where a buffer is normally pushed in the above
|
|
* signal) we can still be empty because the pushed buffer got flushed or
|
|
* when the application pushes the requested buffer later, we support both
|
|
* possibilities. */
|
|
if (!gst_queue_array_is_empty (priv->queue))
|
|
continue;
|
|
|
|
/* no buffer yet, maybe we are EOS, if not, block for more data. */
|
|
}
|
|
|
|
/* check EOS */
|
|
if (G_UNLIKELY (priv->is_eos))
|
|
goto eos;
|
|
|
|
/* nothing to return, wait a while for new data or flushing. */
|
|
priv->wait_status |= STREAM_WAITING;
|
|
g_cond_wait (&priv->cond, &priv->mutex);
|
|
priv->wait_status &= ~STREAM_WAITING;
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
return ret;
|
|
|
|
/* ERRORS */
|
|
flushing:
|
|
{
|
|
GST_DEBUG_OBJECT (appsrc, "we are flushing");
|
|
g_mutex_unlock (&priv->mutex);
|
|
return GST_FLOW_FLUSHING;
|
|
}
|
|
eos:
|
|
{
|
|
GST_DEBUG_OBJECT (appsrc, "we are EOS");
|
|
g_mutex_unlock (&priv->mutex);
|
|
return GST_FLOW_EOS;
|
|
}
|
|
seek_error:
|
|
{
|
|
g_mutex_unlock (&priv->mutex);
|
|
GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
|
|
GST_ERROR_SYSTEM);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
invalid_segment:
|
|
{
|
|
g_mutex_unlock (&priv->mutex);
|
|
GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
|
|
(NULL), ("Failed to configure the provided input segment."));
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
}
|
|
|
|
/* external API */
|
|
|
|
/**
|
|
* gst_app_src_set_caps:
|
|
* @appsrc: a #GstAppSrc
|
|
* @caps: (nullable): caps to set
|
|
*
|
|
* Set the capabilities on the appsrc element. This function takes
|
|
* a copy of the caps structure. After calling this method, the source will
|
|
* only produce caps that match @caps. @caps must be fixed and the caps on the
|
|
* buffers must match the caps or left NULL.
|
|
*/
|
|
void
|
|
gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
gboolean caps_changed;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
if (caps && priv->last_caps)
|
|
caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
|
|
else
|
|
caps_changed = (caps != priv->last_caps);
|
|
|
|
if (caps_changed) {
|
|
GstCaps *new_caps;
|
|
gpointer t;
|
|
|
|
new_caps = caps ? gst_caps_copy (caps) : NULL;
|
|
GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
|
|
|
|
while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
|
|
gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
|
|
}
|
|
gst_queue_array_push_tail (priv->queue, new_caps);
|
|
gst_caps_replace (&priv->last_caps, new_caps);
|
|
|
|
if ((priv->wait_status & STREAM_WAITING))
|
|
g_cond_broadcast (&priv->cond);
|
|
}
|
|
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_caps:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the configured caps on @appsrc.
|
|
*
|
|
* Returns: (nullable) (transfer full): the #GstCaps produced by the source. gst_caps_unref() after usage.
|
|
*/
|
|
GstCaps *
|
|
gst_app_src_get_caps (GstAppSrc * appsrc)
|
|
{
|
|
|
|
GstCaps *caps;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
if ((caps = appsrc->priv->last_caps))
|
|
gst_caps_ref (caps);
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return caps;
|
|
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_size:
|
|
* @appsrc: a #GstAppSrc
|
|
* @size: the size to set
|
|
*
|
|
* Set the size of the stream in bytes. A value of -1 means that the size is
|
|
* not known.
|
|
*/
|
|
void
|
|
gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
|
|
priv->size = size;
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_size:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the size of the stream in bytes. A value of -1 means that the size is
|
|
* not known.
|
|
*
|
|
* Returns: the size of the stream previously set with gst_app_src_set_size();
|
|
*/
|
|
gint64
|
|
gst_app_src_get_size (GstAppSrc * appsrc)
|
|
{
|
|
gint64 size;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
size = priv->size;
|
|
GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return size;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_duration:
|
|
* @appsrc: a #GstAppSrc
|
|
* @duration: the duration to set
|
|
*
|
|
* Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
|
|
* not known.
|
|
*
|
|
* Since: 1.10
|
|
*/
|
|
void
|
|
gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (duration));
|
|
priv->duration = duration;
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_duration:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
|
|
* not known.
|
|
*
|
|
* Returns: the duration of the stream previously set with gst_app_src_set_duration();
|
|
*
|
|
* Since: 1.10
|
|
*/
|
|
GstClockTime
|
|
gst_app_src_get_duration (GstAppSrc * appsrc)
|
|
{
|
|
GstClockTime duration;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
duration = priv->duration;
|
|
GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (duration));
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return duration;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_stream_type:
|
|
* @appsrc: a #GstAppSrc
|
|
* @type: the new state
|
|
*
|
|
* Set the stream type on @appsrc. For seekable streams, the "seek" signal must
|
|
* be connected to.
|
|
*
|
|
* A stream_type stream
|
|
*/
|
|
void
|
|
gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
|
|
priv->stream_type = type;
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_stream_type:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the stream type. Control the stream type of @appsrc
|
|
* with gst_app_src_set_stream_type().
|
|
*
|
|
* Returns: the stream type.
|
|
*/
|
|
GstAppStreamType
|
|
gst_app_src_get_stream_type (GstAppSrc * appsrc)
|
|
{
|
|
gboolean stream_type;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
stream_type = priv->stream_type;
|
|
GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return stream_type;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_max_bytes:
|
|
* @appsrc: a #GstAppSrc
|
|
* @max: the maximum number of bytes to queue
|
|
*
|
|
* Set the maximum amount of bytes that can be queued in @appsrc.
|
|
* After the maximum amount of bytes are queued, @appsrc will emit the
|
|
* "enough-data" signal.
|
|
*/
|
|
void
|
|
gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (max != priv->max_bytes) {
|
|
GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
|
|
priv->max_bytes = max;
|
|
/* signal the change */
|
|
g_cond_broadcast (&priv->cond);
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_max_bytes:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the maximum amount of bytes that can be queued in @appsrc.
|
|
*
|
|
* Returns: The maximum amount of bytes that can be queued.
|
|
*/
|
|
guint64
|
|
gst_app_src_get_max_bytes (GstAppSrc * appsrc)
|
|
{
|
|
guint64 result;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
result = priv->max_bytes;
|
|
GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_current_level_bytes:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the number of currently queued bytes inside @appsrc.
|
|
*
|
|
* Returns: The number of currently queued bytes.
|
|
*
|
|
* Since: 1.2
|
|
*/
|
|
guint64
|
|
gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
|
|
{
|
|
guint64 queued;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
queued = priv->queued_bytes;
|
|
GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
|
|
queued);
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return queued;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_max_buffers:
|
|
* @appsrc: a #GstAppSrc
|
|
* @max: the maximum number of buffers to queue
|
|
*
|
|
* Set the maximum amount of buffers that can be queued in @appsrc.
|
|
* After the maximum amount of buffers are queued, @appsrc will emit the
|
|
* "enough-data" signal.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
void
|
|
gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (max != priv->max_buffers) {
|
|
GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
|
|
priv->max_buffers = max;
|
|
/* signal the change */
|
|
g_cond_broadcast (&priv->cond);
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_max_buffers:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the maximum amount of buffers that can be queued in @appsrc.
|
|
*
|
|
* Returns: The maximum amount of buffers that can be queued.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
guint64
|
|
gst_app_src_get_max_buffers (GstAppSrc * appsrc)
|
|
{
|
|
guint64 result;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
result = priv->max_buffers;
|
|
GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
|
|
result);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_current_level_buffers:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the number of currently queued buffers inside @appsrc.
|
|
*
|
|
* Returns: The number of currently queued buffers.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
guint64
|
|
gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
|
|
{
|
|
guint64 queued;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
queued = priv->queued_buffers;
|
|
GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
|
|
queued);
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return queued;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_max_time:
|
|
* @appsrc: a #GstAppSrc
|
|
* @max: the maximum amonut of time to queue
|
|
*
|
|
* Set the maximum amount of time that can be queued in @appsrc.
|
|
* After the maximum amount of time are queued, @appsrc will emit the
|
|
* "enough-data" signal.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
void
|
|
gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (max != priv->max_time) {
|
|
GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (max));
|
|
priv->max_time = max;
|
|
/* signal the change */
|
|
g_cond_broadcast (&priv->cond);
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_max_time:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the maximum amount of time that can be queued in @appsrc.
|
|
*
|
|
* Returns: The maximum amount of time that can be queued.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
GstClockTime
|
|
gst_app_src_get_max_time (GstAppSrc * appsrc)
|
|
{
|
|
GstClockTime result;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
result = priv->max_time;
|
|
GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (result));
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_current_level_time:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Get the amount of currently queued time inside @appsrc.
|
|
*
|
|
* Returns: The amount of currently queued time.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
GstClockTime
|
|
gst_app_src_get_current_level_time (GstAppSrc * appsrc)
|
|
{
|
|
gint64 queued;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
GST_OBJECT_LOCK (appsrc);
|
|
queued = priv->queued_time;
|
|
GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (queued));
|
|
GST_OBJECT_UNLOCK (appsrc);
|
|
|
|
return queued;
|
|
}
|
|
|
|
static void
|
|
gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
|
|
gboolean do_max, guint64 max)
|
|
{
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
gboolean changed = FALSE;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (do_min && priv->min_latency != min) {
|
|
priv->min_latency = min;
|
|
changed = TRUE;
|
|
}
|
|
if (do_max && priv->max_latency != max) {
|
|
priv->max_latency = max;
|
|
changed = TRUE;
|
|
}
|
|
if (!priv->posted_latency_msg) {
|
|
priv->posted_latency_msg = TRUE;
|
|
changed = TRUE;
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
if (changed) {
|
|
GST_DEBUG_OBJECT (appsrc, "posting latency changed");
|
|
gst_element_post_message (GST_ELEMENT_CAST (appsrc),
|
|
gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_leaky_type:
|
|
* @appsrc: a #GstAppSrc
|
|
* @leaky: the #GstAppLeakyType
|
|
*
|
|
* When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
|
|
* will drop any buffers that are pushed into it once its internal queue is
|
|
* full. The selected type defines whether to drop the oldest or new
|
|
* buffers.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
void
|
|
gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
|
|
{
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
appsrc->priv->leaky_type = leaky;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_leaky_type:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
|
|
* for more details.
|
|
*
|
|
* Returns: The currently set #GstAppLeakyType.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
GstAppLeakyType
|
|
gst_app_src_get_leaky_type (GstAppSrc * appsrc)
|
|
{
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
|
|
|
|
return appsrc->priv->leaky_type;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_latency:
|
|
* @appsrc: a #GstAppSrc
|
|
* @min: the min latency
|
|
* @max: the max latency
|
|
*
|
|
* Configure the @min and @max latency in @src. If @min is set to -1, the
|
|
* default latency calculations for pseudo-live sources will be used.
|
|
*/
|
|
void
|
|
gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
|
|
{
|
|
gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_latency:
|
|
* @appsrc: a #GstAppSrc
|
|
* @min: (out): the min latency
|
|
* @max: (out): the max latency
|
|
*
|
|
* Retrieve the min and max latencies in @min and @max respectively.
|
|
*/
|
|
void
|
|
gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (min)
|
|
*min = priv->min_latency;
|
|
if (max)
|
|
*max = priv->max_latency;
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_emit_signals:
|
|
* @appsrc: a #GstAppSrc
|
|
* @emit: the new state
|
|
*
|
|
* Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
|
|
* by default disabled because signal emission is expensive and unneeded when
|
|
* the application prefers to operate in pull mode.
|
|
*/
|
|
void
|
|
gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
priv->emit_signals = emit;
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_get_emit_signals:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
|
|
*
|
|
* Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
|
|
* signals.
|
|
*/
|
|
gboolean
|
|
gst_app_src_get_emit_signals (GstAppSrc * appsrc)
|
|
{
|
|
gboolean result;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
result = priv->emit_signals;
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return result;
|
|
}
|
|
|
|
static GstFlowReturn
|
|
gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
|
|
GstBufferList * buflist, gboolean steal_ref)
|
|
{
|
|
gboolean first = TRUE;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
if (buffer != NULL)
|
|
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
|
|
else
|
|
g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
|
|
|
|
if (buflist != NULL) {
|
|
if (gst_buffer_list_length (buflist) == 0)
|
|
return GST_FLOW_OK;
|
|
|
|
buffer = gst_buffer_list_get (buflist, 0);
|
|
}
|
|
|
|
if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
|
|
GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
|
|
gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
|
|
GstClock *clock;
|
|
|
|
clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
|
|
if (clock) {
|
|
GstClockTime now;
|
|
GstClockTime base_time =
|
|
gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
|
|
|
|
now = gst_clock_get_time (clock);
|
|
if (now > base_time)
|
|
now -= base_time;
|
|
else
|
|
now = 0;
|
|
gst_object_unref (clock);
|
|
|
|
if (buflist == NULL) {
|
|
if (!steal_ref) {
|
|
buffer = gst_buffer_copy (buffer);
|
|
steal_ref = TRUE;
|
|
} else {
|
|
buffer = gst_buffer_make_writable (buffer);
|
|
}
|
|
} else {
|
|
if (!steal_ref) {
|
|
buflist = gst_buffer_list_copy (buflist);
|
|
steal_ref = TRUE;
|
|
} else {
|
|
buflist = gst_buffer_list_make_writable (buflist);
|
|
}
|
|
buffer = gst_buffer_list_get_writable (buflist, 0);
|
|
}
|
|
|
|
GST_BUFFER_PTS (buffer) = now;
|
|
GST_BUFFER_DTS (buffer) = now;
|
|
} else {
|
|
GST_WARNING_OBJECT (appsrc,
|
|
"do-timestamp=TRUE but buffers are provided before "
|
|
"reaching the PLAYING state and having a clock. Timestamps will "
|
|
"not be accurate!");
|
|
}
|
|
}
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
|
|
while (TRUE) {
|
|
/* can't accept buffers when we are flushing or EOS */
|
|
if (priv->flushing)
|
|
goto flushing;
|
|
|
|
if (priv->is_eos)
|
|
goto eos;
|
|
|
|
if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
|
|
(priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
|
|
(priv->max_time && priv->queued_time >= priv->max_time)) {
|
|
GST_DEBUG_OBJECT (appsrc,
|
|
"queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
|
|
G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
|
|
" buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
|
|
GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
|
|
priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
|
|
priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
|
|
GST_TIME_ARGS (priv->max_time));
|
|
|
|
if (first) {
|
|
Callbacks *callbacks = NULL;
|
|
gboolean emit;
|
|
|
|
emit = priv->emit_signals;
|
|
if (priv->callbacks)
|
|
callbacks = callbacks_ref (priv->callbacks);
|
|
/* only signal on the first push */
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
if (callbacks && callbacks->callbacks.enough_data)
|
|
callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
|
|
else if (emit)
|
|
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
|
|
NULL);
|
|
|
|
g_clear_pointer (&callbacks, callbacks_unref);
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
}
|
|
|
|
if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
|
|
priv->need_discont_upstream = TRUE;
|
|
goto dropped;
|
|
} else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
|
|
guint i, length = gst_queue_array_get_length (priv->queue);
|
|
GstMiniObject *item = NULL;
|
|
|
|
/* Find the oldest buffer or buffer list and drop it, then update the
|
|
* limits. Dropping one is sufficient to go below the limits again.
|
|
*/
|
|
for (i = 0; i < length; i++) {
|
|
item = gst_queue_array_peek_nth (priv->queue, i);
|
|
if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
|
|
gst_queue_array_drop_element (priv->queue, i);
|
|
break;
|
|
}
|
|
/* To not accidentally have an event after the loop */
|
|
item = NULL;
|
|
}
|
|
|
|
if (!item) {
|
|
GST_FIXME_OBJECT (appsrc,
|
|
"No buffer or buffer list queued but queue is full");
|
|
/* This shouldn't really happen but in this case we can't really do
|
|
* anything apart from accepting the buffer / bufferlist */
|
|
break;
|
|
}
|
|
|
|
GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
|
|
|
|
gst_app_src_update_queued_pop (appsrc, item, FALSE);
|
|
gst_mini_object_unref (item);
|
|
|
|
priv->need_discont_downstream = TRUE;
|
|
continue;
|
|
}
|
|
|
|
if (first) {
|
|
/* continue to check for flushing/eos after releasing the lock */
|
|
first = FALSE;
|
|
continue;
|
|
}
|
|
if (priv->block) {
|
|
GST_DEBUG_OBJECT (appsrc, "waiting for free space");
|
|
/* we are filled, wait until a buffer gets popped or when we
|
|
* flush. */
|
|
priv->wait_status |= APP_WAITING;
|
|
g_cond_wait (&priv->cond, &priv->mutex);
|
|
priv->wait_status &= ~APP_WAITING;
|
|
} else {
|
|
/* no need to wait for free space, we just pump more data into the
|
|
* queue hoping that the caller reacts to the enough-data signal and
|
|
* stops pushing buffers. */
|
|
break;
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (priv->pending_custom_segment) {
|
|
GstEvent *event = gst_event_new_segment (&priv->last_segment);
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
|
|
gst_queue_array_push_tail (priv->queue, event);
|
|
priv->pending_custom_segment = FALSE;
|
|
}
|
|
|
|
if (buflist != NULL) {
|
|
/* Mark the first buffer of the buffer list as DISCONT if we previously
|
|
* dropped a buffer instead of queueing it */
|
|
if (priv->need_discont_upstream) {
|
|
if (!steal_ref) {
|
|
buflist = gst_buffer_list_copy (buflist);
|
|
steal_ref = TRUE;
|
|
} else {
|
|
buflist = gst_buffer_list_make_writable (buflist);
|
|
}
|
|
buffer = gst_buffer_list_get_writable (buflist, 0);
|
|
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
|
|
priv->need_discont_upstream = FALSE;
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
|
|
|
|
if (!steal_ref)
|
|
gst_buffer_list_ref (buflist);
|
|
gst_queue_array_push_tail (priv->queue, buflist);
|
|
} else {
|
|
/* Mark the buffer as DISCONT if we previously dropped a buffer instead of
|
|
* queueing it */
|
|
if (priv->need_discont_upstream) {
|
|
if (!steal_ref) {
|
|
buffer = gst_buffer_copy (buffer);
|
|
steal_ref = TRUE;
|
|
} else {
|
|
buffer = gst_buffer_make_writable (buffer);
|
|
}
|
|
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
|
|
priv->need_discont_upstream = FALSE;
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
|
|
if (!steal_ref)
|
|
gst_buffer_ref (buffer);
|
|
gst_queue_array_push_tail (priv->queue, buffer);
|
|
}
|
|
|
|
gst_app_src_update_queued_push (appsrc,
|
|
buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
|
|
|
|
if ((priv->wait_status & STREAM_WAITING))
|
|
g_cond_broadcast (&priv->cond);
|
|
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return GST_FLOW_OK;
|
|
|
|
/* ERRORS */
|
|
flushing:
|
|
{
|
|
GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
|
|
if (steal_ref) {
|
|
if (buflist)
|
|
gst_buffer_list_unref (buflist);
|
|
else
|
|
gst_buffer_unref (buffer);
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
return GST_FLOW_FLUSHING;
|
|
}
|
|
eos:
|
|
{
|
|
GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
|
|
if (steal_ref) {
|
|
if (buflist)
|
|
gst_buffer_list_unref (buflist);
|
|
else
|
|
gst_buffer_unref (buffer);
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
return GST_FLOW_EOS;
|
|
}
|
|
dropped:
|
|
{
|
|
GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
|
|
if (steal_ref) {
|
|
if (buflist)
|
|
gst_buffer_list_unref (buflist);
|
|
else
|
|
gst_buffer_unref (buffer);
|
|
}
|
|
g_mutex_unlock (&priv->mutex);
|
|
return GST_FLOW_EOS;
|
|
}
|
|
}
|
|
|
|
static GstFlowReturn
|
|
gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
|
|
gboolean steal_ref)
|
|
{
|
|
return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
|
|
}
|
|
|
|
static GstFlowReturn
|
|
gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
|
|
{
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
GstBufferList *buffer_list;
|
|
GstBuffer *buffer;
|
|
GstCaps *caps;
|
|
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
caps = gst_sample_get_caps (sample);
|
|
if (caps != NULL) {
|
|
gst_app_src_set_caps (appsrc, caps);
|
|
} else {
|
|
GST_WARNING_OBJECT (appsrc, "received sample without caps");
|
|
}
|
|
|
|
if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
|
|
GstSegment *segment = gst_sample_get_segment (sample);
|
|
|
|
if (segment->format != GST_FORMAT_TIME) {
|
|
GST_LOG_OBJECT (appsrc, "format %s is not supported",
|
|
gst_format_get_name (segment->format));
|
|
goto handle_buffer;
|
|
}
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
if (gst_segment_is_equal (&priv->last_segment, segment)) {
|
|
GST_LOG_OBJECT (appsrc, "segment wasn't changed");
|
|
g_mutex_unlock (&priv->mutex);
|
|
goto handle_buffer;
|
|
} else {
|
|
GST_LOG_OBJECT (appsrc,
|
|
"segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
|
|
&priv->last_segment, segment);
|
|
}
|
|
|
|
/* will be pushed to queue with next buffer/buffer-list */
|
|
gst_segment_copy_into (segment, &priv->last_segment);
|
|
priv->pending_custom_segment = TRUE;
|
|
g_mutex_unlock (&priv->mutex);
|
|
}
|
|
|
|
handle_buffer:
|
|
|
|
buffer = gst_sample_get_buffer (sample);
|
|
if (buffer != NULL)
|
|
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
|
|
|
|
buffer_list = gst_sample_get_buffer_list (sample);
|
|
if (buffer_list != NULL)
|
|
return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
|
|
|
|
GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_push_buffer:
|
|
* @appsrc: a #GstAppSrc
|
|
* @buffer: (transfer full): a #GstBuffer to push
|
|
*
|
|
* Adds a buffer to the queue of buffers that the appsrc element will
|
|
* push to its source pad. This function takes ownership of the buffer.
|
|
*
|
|
* When the block property is TRUE, this function can block until free
|
|
* space becomes available in the queue.
|
|
*
|
|
* Returns: #GST_FLOW_OK when the buffer was successfully queued.
|
|
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
|
|
* #GST_FLOW_EOS when EOS occurred.
|
|
*/
|
|
GstFlowReturn
|
|
gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
|
|
{
|
|
return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_push_buffer_list:
|
|
* @appsrc: a #GstAppSrc
|
|
* @buffer_list: (transfer full): a #GstBufferList to push
|
|
*
|
|
* Adds a buffer list to the queue of buffers and buffer lists that the
|
|
* appsrc element will push to its source pad. This function takes ownership
|
|
* of @buffer_list.
|
|
*
|
|
* When the block property is TRUE, this function can block until free
|
|
* space becomes available in the queue.
|
|
*
|
|
* Returns: #GST_FLOW_OK when the buffer list was successfully queued.
|
|
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
|
|
* #GST_FLOW_EOS when EOS occurred.
|
|
*
|
|
* Since: 1.14
|
|
*/
|
|
GstFlowReturn
|
|
gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
|
|
{
|
|
return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_push_sample:
|
|
* @appsrc: a #GstAppSrc
|
|
* @sample: (transfer none): a #GstSample from which buffer and caps may be
|
|
* extracted
|
|
*
|
|
* Extract a buffer from the provided sample and adds it to the queue of
|
|
* buffers that the appsrc element will push to its source pad. Any
|
|
* previous caps that were set on appsrc will be replaced by the caps
|
|
* associated with the sample if not equal.
|
|
*
|
|
* This function does not take ownership of the
|
|
* sample so the sample needs to be unreffed after calling this function.
|
|
*
|
|
* When the block property is TRUE, this function can block until free
|
|
* space becomes available in the queue.
|
|
*
|
|
* Returns: #GST_FLOW_OK when the buffer was successfully queued.
|
|
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
|
|
* #GST_FLOW_EOS when EOS occurred.
|
|
*
|
|
* Since: 1.6
|
|
*
|
|
*/
|
|
GstFlowReturn
|
|
gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
|
|
{
|
|
return gst_app_src_push_sample_internal (appsrc, sample);
|
|
}
|
|
|
|
/* push a buffer without stealing the ref of the buffer. This is used for the
|
|
* action signal. */
|
|
static GstFlowReturn
|
|
gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
|
|
{
|
|
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
|
|
}
|
|
|
|
/* push a buffer list without stealing the ref of the buffer list. This is
|
|
* used for the action signal. */
|
|
static GstFlowReturn
|
|
gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
|
|
GstBufferList * buffer_list)
|
|
{
|
|
return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
|
|
}
|
|
|
|
/* push a sample without stealing the ref. This is used for the
|
|
* action signal. */
|
|
static GstFlowReturn
|
|
gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
|
|
{
|
|
return gst_app_src_push_sample_internal (appsrc, sample);
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_end_of_stream:
|
|
* @appsrc: a #GstAppSrc
|
|
*
|
|
* Indicates to the appsrc element that the last buffer queued in the
|
|
* element is the last buffer of the stream.
|
|
*
|
|
* Returns: #GST_FLOW_OK when the EOS was successfully queued.
|
|
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
|
|
*/
|
|
GstFlowReturn
|
|
gst_app_src_end_of_stream (GstAppSrc * appsrc)
|
|
{
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
/* can't accept buffers when we are flushing. We can accept them when we are
|
|
* EOS although it will not do anything. */
|
|
if (priv->flushing)
|
|
goto flushing;
|
|
|
|
GST_DEBUG_OBJECT (appsrc, "sending EOS");
|
|
priv->is_eos = TRUE;
|
|
g_cond_broadcast (&priv->cond);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
return GST_FLOW_OK;
|
|
|
|
/* ERRORS */
|
|
flushing:
|
|
{
|
|
g_mutex_unlock (&priv->mutex);
|
|
GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
|
|
return GST_FLOW_FLUSHING;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* gst_app_src_set_callbacks: (skip)
|
|
* @appsrc: a #GstAppSrc
|
|
* @callbacks: the callbacks
|
|
* @user_data: a user_data argument for the callbacks
|
|
* @notify: a destroy notify function
|
|
*
|
|
* Set callbacks which will be executed when data is needed, enough data has
|
|
* been collected or when a seek should be performed.
|
|
* This is an alternative to using the signals, it has lower overhead and is thus
|
|
* less expensive, but also less flexible.
|
|
*
|
|
* If callbacks are installed, no signals will be emitted for performance
|
|
* reasons.
|
|
*
|
|
* Before 1.16.3 it was not possible to change the callbacks in a thread-safe
|
|
* way.
|
|
*/
|
|
void
|
|
gst_app_src_set_callbacks (GstAppSrc * appsrc,
|
|
GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
|
|
{
|
|
Callbacks *old_callbacks, *new_callbacks = NULL;
|
|
GstAppSrcPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_APP_SRC (appsrc));
|
|
g_return_if_fail (callbacks != NULL);
|
|
|
|
priv = appsrc->priv;
|
|
|
|
if (callbacks) {
|
|
new_callbacks = g_new0 (Callbacks, 1);
|
|
new_callbacks->callbacks = *callbacks;
|
|
new_callbacks->user_data = user_data;
|
|
new_callbacks->destroy_notify = notify;
|
|
new_callbacks->ref_count = 1;
|
|
}
|
|
|
|
g_mutex_lock (&priv->mutex);
|
|
old_callbacks = g_steal_pointer (&priv->callbacks);
|
|
priv->callbacks = g_steal_pointer (&new_callbacks);
|
|
g_mutex_unlock (&priv->mutex);
|
|
|
|
g_clear_pointer (&old_callbacks, callbacks_unref);
|
|
}
|
|
|
|
/*** GSTURIHANDLER INTERFACE *************************************************/
|
|
|
|
static GstURIType
|
|
gst_app_src_uri_get_type (GType type)
|
|
{
|
|
return GST_URI_SRC;
|
|
}
|
|
|
|
static const gchar *const *
|
|
gst_app_src_uri_get_protocols (GType type)
|
|
{
|
|
static const gchar *protocols[] = { "appsrc", NULL };
|
|
|
|
return protocols;
|
|
}
|
|
|
|
static gchar *
|
|
gst_app_src_uri_get_uri (GstURIHandler * handler)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC (handler);
|
|
|
|
return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
|
|
GError ** error)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC (handler);
|
|
|
|
g_free (appsrc->priv->uri);
|
|
appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
|
|
{
|
|
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
|
|
|
|
iface->get_type = gst_app_src_uri_get_type;
|
|
iface->get_protocols = gst_app_src_uri_get_protocols;
|
|
iface->get_uri = gst_app_src_uri_get_uri;
|
|
iface->set_uri = gst_app_src_uri_set_uri;
|
|
}
|
|
|
|
static gboolean
|
|
gst_app_src_event (GstBaseSrc * src, GstEvent * event)
|
|
{
|
|
GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
|
|
GstAppSrcPrivate *priv = appsrc->priv;
|
|
|
|
switch (GST_EVENT_TYPE (event)) {
|
|
case GST_EVENT_FLUSH_STOP:
|
|
g_mutex_lock (&priv->mutex);
|
|
priv->is_eos = FALSE;
|
|
g_mutex_unlock (&priv->mutex);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
|
|
}
|