/* GStreamer * Copyright (C) 2007 David Schleef * (C) 2008 Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /** * SECTION:gstappsrc * @title: GstAppSrc * @short_description: Easy way for applications to inject buffers into a * pipeline * @see_also: #GstBaseSrc, appsink * * The appsrc element can be used by applications to insert data into a * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides * external API functions. * * appsrc can be used by linking with the libgstapp library to access the * methods directly or by using the appsrc action signals. * * Before operating appsrc, the caps property must be set to fixed caps * describing the format of the data that will be pushed with appsrc. An * exception to this is when pushing buffers with unknown caps, in which case no * caps should be set. This is typically true of file-like sources that push raw * byte buffers. If you don't want to explicitly set the caps, you can use * gst_app_src_push_sample. This method gets the caps associated with the * sample and sets them on the appsrc replacing any previously set caps (if * different from sample's caps). * * The main way of handing data to the appsrc element is by calling the * gst_app_src_push_buffer() method or by emitting the push-buffer action signal. * This will put the buffer onto a queue from which appsrc will read from in its * streaming thread. It is important to note that data transport will not happen * from the thread that performed the push-buffer call. * * The "max-bytes", "max-buffers" and "max-time" properties control how much * data can be queued in appsrc before appsrc considers the queue full. A * filled internal queue will always signal the "enough-data" signal, which * signals the application that it should stop pushing data into appsrc. The * "block" property will cause appsrc to block the push-buffer method until * free data becomes available again. * * When the internal queue is running out of data, the "need-data" signal is * emitted, which signals the application that it should start pushing more data * into appsrc. * * In addition to the "need-data" and "enough-data" signals, appsrc can emit the * "seek-data" signal when the "stream-mode" property is set to "seekable" or * "random-access". The signal argument will contain the new desired position in * the stream expressed in the unit set with the "format" property. After * receiving the seek-data signal, the application should push-buffers from the * new position. * * These signals allow the application to operate the appsrc in two different * ways: * * The push mode, in which the application repeatedly calls the push-buffer/push-sample * method with a new buffer/sample. Optionally, the queue size in the appsrc * can be controlled with the enough-data and need-data signals by respectively * stopping/starting the push-buffer/push-sample calls. This is a typical * mode of operation for the stream-type "stream" and "seekable". Use this * mode when implementing various network protocols or hardware devices. * * The pull mode, in which the need-data signal triggers the next push-buffer call. * This mode is typically used in the "random-access" stream-type. Use this * mode for file access or other randomly accessible sources. In this mode, a * buffer of exactly the amount of bytes given by the need-data signal should be * pushed into appsrc. * * In all modes, the size property on appsrc should contain the total stream * size in bytes. Setting this property is mandatory in the random-access mode. * For the stream and seekable modes, setting this property is optional but * recommended. * * When the application has finished pushing data into appsrc, it should call * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After * this call, no more buffers can be pushed into appsrc until a flushing seek * occurs or the state of the appsrc has gone through READY. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include "gstappsrc.h" typedef enum { NOONE_WAITING = 0, STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */ APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSrcWaitStatus; typedef struct { GstAppSrcCallbacks callbacks; gpointer user_data; GDestroyNotify destroy_notify; gint ref_count; } Callbacks; static Callbacks * callbacks_ref (Callbacks * callbacks) { g_atomic_int_inc (&callbacks->ref_count); return callbacks; } static void callbacks_unref (Callbacks * callbacks) { if (!g_atomic_int_dec_and_test (&callbacks->ref_count)) return; if (callbacks->destroy_notify) callbacks->destroy_notify (callbacks->user_data); g_free (callbacks); } struct _GstAppSrcPrivate { GCond cond; GMutex mutex; GstQueueArray *queue; GstAppSrcWaitStatus wait_status; GstCaps *last_caps; GstCaps *current_caps; /* last segment received on the input */ GstSegment last_segment; /* currently configured segment for the output */ GstSegment current_segment; /* queue up a segment event based on last_segment before * the next buffer of buffer list */ gboolean pending_custom_segment; /* the next buffer that will be queued needs a discont flag * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */ gboolean need_discont_upstream; /* the next buffer that will be dequeued needs a discont flag * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */ gboolean need_discont_downstream; gint64 size; GstClockTime duration; GstAppStreamType stream_type; guint64 max_bytes, max_buffers, max_time; GstFormat format; gboolean block; gchar *uri; gboolean flushing; gboolean started; gboolean is_eos; guint64 queued_bytes, queued_buffers; /* Used to calculate the current time level */ GstClockTime last_in_running_time, last_out_running_time; /* Updated based on the above whenever they change */ GstClockTime queued_time; guint64 offset; GstAppStreamType current_type; guint64 min_latency; guint64 max_latency; gboolean emit_signals; guint min_percent; gboolean handle_segment_change; GstAppLeakyType leaky_type; Callbacks *callbacks; }; GST_DEBUG_CATEGORY_STATIC (app_src_debug); #define GST_CAT_DEFAULT app_src_debug enum { /* signals */ SIGNAL_NEED_DATA, SIGNAL_ENOUGH_DATA, SIGNAL_SEEK_DATA, /* actions */ SIGNAL_PUSH_BUFFER, SIGNAL_END_OF_STREAM, SIGNAL_PUSH_SAMPLE, SIGNAL_PUSH_BUFFER_LIST, LAST_SIGNAL }; #define DEFAULT_PROP_SIZE -1 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM #define DEFAULT_PROP_MAX_BYTES 200000 #define DEFAULT_PROP_MAX_BUFFERS 0 #define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND) #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES #define DEFAULT_PROP_BLOCK FALSE #define DEFAULT_PROP_IS_LIVE FALSE #define DEFAULT_PROP_MIN_LATENCY -1 #define DEFAULT_PROP_MAX_LATENCY -1 #define DEFAULT_PROP_EMIT_SIGNALS TRUE #define DEFAULT_PROP_MIN_PERCENT 0 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0 #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE #define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE enum { PROP_0, PROP_CAPS, PROP_SIZE, PROP_STREAM_TYPE, PROP_MAX_BYTES, PROP_MAX_BUFFERS, PROP_MAX_TIME, PROP_FORMAT, PROP_BLOCK, PROP_IS_LIVE, PROP_MIN_LATENCY, PROP_MAX_LATENCY, PROP_EMIT_SIGNALS, PROP_MIN_PERCENT, PROP_CURRENT_LEVEL_BYTES, PROP_CURRENT_LEVEL_BUFFERS, PROP_CURRENT_LEVEL_TIME, PROP_DURATION, PROP_HANDLE_SEGMENT_CHANGE, PROP_LEAKY_TYPE, PROP_LAST }; static GstStaticPadTemplate gst_app_src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); static void gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data); static void gst_app_src_dispose (GObject * object); static void gst_app_src_finalize (GObject * object); static void gst_app_src_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event); static void gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min, gboolean do_max, guint64 max); static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc); static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter); static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, GstBuffer ** buf); static gboolean gst_app_src_start (GstBaseSrc * bsrc); static gboolean gst_app_src_stop (GstBaseSrc * bsrc); static gboolean gst_app_src_unlock (GstBaseSrc * bsrc); static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc); static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment); static gboolean gst_app_src_is_seekable (GstBaseSrc * src); static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size); static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query); static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event); static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer); static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc, GstBufferList * buffer_list); static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample); static guint gst_app_src_signals[LAST_SIGNAL] = { 0 }; #define gst_app_src_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC, G_ADD_PRIVATE (GstAppSrc) G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init)); static void gst_app_src_class_init (GstAppSrcClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; GstElementClass *element_class = (GstElementClass *) klass; GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass; GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element"); gobject_class->dispose = gst_app_src_dispose; gobject_class->finalize = gst_app_src_finalize; gobject_class->set_property = gst_app_src_set_property; gobject_class->get_property = gst_app_src_get_property; /** * GstAppSrc:caps: * * The GstCaps that will negotiated downstream and will be put * on outgoing buffers. */ g_object_class_install_property (gobject_class, PROP_CAPS, g_param_spec_boxed ("caps", "Caps", "The allowed caps for the src pad", GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:format: * * The format to use for segment events. When the source is producing * timestamped buffers this property should be set to GST_FORMAT_TIME. */ g_object_class_install_property (gobject_class, PROP_FORMAT, g_param_spec_enum ("format", "Format", "The format of the segment events and seek", GST_TYPE_FORMAT, DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:size: * * The total size in bytes of the data stream. If the total size is known, it * is recommended to configure it with this property. */ g_object_class_install_property (gobject_class, PROP_SIZE, g_param_spec_int64 ("size", "Size", "The size of the data stream in bytes (-1 if unknown)", -1, G_MAXINT64, DEFAULT_PROP_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:stream-type: * * The type of stream that this source is producing. For seekable streams the * application should connect to the seek-data signal. */ g_object_class_install_property (gobject_class, PROP_STREAM_TYPE, g_param_spec_enum ("stream-type", "Stream Type", "the type of the stream", GST_TYPE_APP_STREAM_TYPE, DEFAULT_PROP_STREAM_TYPE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:max-bytes: * * The maximum amount of bytes that can be queued internally. * After the maximum amount of bytes are queued, appsrc will emit the * "enough-data" signal. */ g_object_class_install_property (gobject_class, PROP_MAX_BYTES, g_param_spec_uint64 ("max-bytes", "Max bytes", "The maximum number of bytes to queue internally (0 = unlimited)", 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:max-buffers: * * The maximum amount of buffers that can be queued internally. * After the maximum amount of buffers are queued, appsrc will emit the * "enough-data" signal. * * Since: 1.20 */ g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS, g_param_spec_uint64 ("max-buffers", "Max buffers", "The maximum number of buffers to queue internally (0 = unlimited)", 0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:max-time: * * The maximum amount of time that can be queued internally. * After the maximum amount of time are queued, appsrc will emit the * "enough-data" signal. * * Since: 1.20 */ g_object_class_install_property (gobject_class, PROP_MAX_TIME, g_param_spec_uint64 ("max-time", "Max time", "The maximum amount of time to queue internally (0 = unlimited)", 0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:block: * * When max-bytes are queued and after the enough-data signal has been emitted, * block any further push-buffer calls until the amount of queued bytes drops * below the max-bytes limit. */ g_object_class_install_property (gobject_class, PROP_BLOCK, g_param_spec_boolean ("block", "Block", "Block push-buffer when max-bytes are queued", DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:is-live: * * Instruct the source to behave like a live source. This includes that it * will only push out buffers in the PLAYING state. */ g_object_class_install_property (gobject_class, PROP_IS_LIVE, g_param_spec_boolean ("is-live", "Is Live", "Whether to act as a live source", DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:min-latency: * * The minimum latency of the source. A value of -1 will use the default * latency calculations of #GstBaseSrc. */ g_object_class_install_property (gobject_class, PROP_MIN_LATENCY, g_param_spec_int64 ("min-latency", "Min Latency", "The minimum latency (-1 = default)", -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc::max-latency: * * The maximum latency of the source. A value of -1 means an unlimited amount * of latency. */ g_object_class_install_property (gobject_class, PROP_MAX_LATENCY, g_param_spec_int64 ("max-latency", "Max Latency", "The maximum latency (-1 = unlimited)", -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:emit-signals: * * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals. * This option is by default enabled for backwards compatibility reasons but * can disabled when needed because signal emission is expensive. */ g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS, g_param_spec_boolean ("emit-signals", "Emit signals", "Emit need-data, enough-data and seek-data signals", DEFAULT_PROP_EMIT_SIGNALS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:min-percent: * * Make appsrc emit the "need-data" signal when the amount of bytes in the * queue drops below this percentage of max-bytes. */ g_object_class_install_property (gobject_class, PROP_MIN_PERCENT, g_param_spec_uint ("min-percent", "Min Percent", "Emit need-data when queued bytes drops below this percent of max-bytes", 0, 100, DEFAULT_PROP_MIN_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:current-level-bytes: * * The number of currently queued bytes inside appsrc. * * Since: 1.2 */ g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES, g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes", "The number of currently queued bytes", 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:current-level-buffers: * * The number of currently queued buffers inside appsrc. * * Since: 1.20 */ g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS, g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers", "The number of currently queued buffers", 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:current-level-time: * * The amount of currently queued time inside appsrc. * * Since: 1.20 */ g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME, g_param_spec_uint64 ("current-level-time", "Current Level Time", "The amount of currently queued time", 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:duration: * * The total duration in nanoseconds of the data stream. If the total duration is known, it * is recommended to configure it with this property. * * Since: 1.10 */ g_object_class_install_property (gobject_class, PROP_DURATION, g_param_spec_uint64 ("duration", "Duration", "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)", 0, G_MAXUINT64, DEFAULT_PROP_DURATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:handle-segment-change: * * When enabled, appsrc will check GstSegment in GstSample which was * pushed via gst_app_src_push_sample() or "push-sample" signal action. * If a GstSegment is changed, corresponding segment event will be followed * by next data flow. * * FIXME: currently only GST_FORMAT_TIME format is supported and therefore * GstAppSrc::format should be time. However, possibly #GstAppSrc can support * other formats. * * Since: 1.18 */ g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE, g_param_spec_boolean ("handle-segment-change", "Handle Segment Change", "Whether to detect and handle changed time format GstSegment in " "GstSample. User should set valid GstSegment in GstSample. " "Must set format property as \"time\" to enable this property", DEFAULT_PROP_HANDLE_SEGMENT_CHANGE, G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc:leaky-type: * * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc * will drop any buffers that are pushed into it once its internal queue is * full. The selected type defines whether to drop the oldest or new * buffers. * * Since: 1.20 */ g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE, g_param_spec_enum ("leaky-type", "Leaky Type", "Whether to drop buffers once the internal queue is full", GST_TYPE_APP_LEAKY_TYPE, DEFAULT_PROP_LEAKY_TYPE, G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS)); /** * GstAppSrc::need-data: * @appsrc: the appsrc element that emitted the signal * @length: the amount of bytes needed. * * Signal that the source needs more data. In the callback or from another * thread you should call push-buffer or end-of-stream. * * @length is just a hint and when it is set to -1, any number of bytes can be * pushed into @appsrc. * * You can call push-buffer multiple times until the enough-data signal is * fired. */ gst_app_src_signals[SIGNAL_NEED_DATA] = g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, need_data), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT); /** * GstAppSrc::enough-data: * @appsrc: the appsrc element that emitted the signal * * Signal that the source has enough data. It is recommended that the * application stops calling push-buffer until the need-data signal is * emitted again to avoid excessive buffer queueing. */ gst_app_src_signals[SIGNAL_ENOUGH_DATA] = g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, enough_data), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE); /** * GstAppSrc::seek-data: * @appsrc: the appsrc element that emitted the signal * @offset: the offset to seek to * * Seek to the given offset. The next push-buffer should produce buffers from * the new @offset. * This callback is only called for seekable stream types. * * Returns: %TRUE if the seek succeeded. */ gst_app_src_signals[SIGNAL_SEEK_DATA] = g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstAppSrcClass, seek_data), NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64); /** * GstAppSrc::push-buffer: * @appsrc: the appsrc * @buffer: a buffer to push * * Adds a buffer to the queue of buffers that the appsrc element will * push to its source pad. This function does not take ownership of the * buffer so the buffer needs to be unreffed after calling this function. * * When the block property is TRUE, this function can block until free space * becomes available in the queue. */ gst_app_src_signals[SIGNAL_PUSH_BUFFER] = g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, push_buffer), NULL, NULL, NULL, GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER); /** * GstAppSrc::push-buffer-list: * @appsrc: the appsrc * @buffer_list: a buffer list to push * * Adds a buffer list to the queue of buffers and buffer lists that the * appsrc element will push to its source pad. This function does not take * ownership of the buffer list so the buffer list needs to be unreffed * after calling this function. * * When the block property is TRUE, this function can block until free space * becomes available in the queue. * * Since: 1.14 */ gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] = g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, push_buffer_list), NULL, NULL, NULL, GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST); /** * GstAppSrc::push-sample: * @appsrc: the appsrc * @sample: a sample from which extract buffer to push * * Extract a buffer from the provided sample and adds the extracted buffer * to the queue of buffers that the appsrc element will * push to its source pad. This function set the appsrc caps based on the caps * in the sample and reset the caps if they change. * Only the caps and the buffer of the provided sample are used and not * for example the segment in the sample. * This function does not take ownership of the * sample so the sample needs to be unreffed after calling this function. * * When the block property is TRUE, this function can block until free space * becomes available in the queue. * * Since: 1.6 */ gst_app_src_signals[SIGNAL_PUSH_SAMPLE] = g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, push_sample), NULL, NULL, NULL, GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE); /** * GstAppSrc::end-of-stream: * @appsrc: the appsrc * * Notify @appsrc that no more buffer are available. */ gst_app_src_signals[SIGNAL_END_OF_STREAM] = g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass, end_of_stream), NULL, NULL, NULL, GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE); gst_element_class_set_static_metadata (element_class, "AppSrc", "Generic/Source", "Allow the application to feed buffers to a pipeline", "David Schleef , Wim Taymans "); gst_element_class_add_static_pad_template (element_class, &gst_app_src_template); element_class->send_event = gst_app_src_send_event; basesrc_class->negotiate = gst_app_src_negotiate; basesrc_class->get_caps = gst_app_src_internal_get_caps; basesrc_class->create = gst_app_src_create; basesrc_class->start = gst_app_src_start; basesrc_class->stop = gst_app_src_stop; basesrc_class->unlock = gst_app_src_unlock; basesrc_class->unlock_stop = gst_app_src_unlock_stop; basesrc_class->do_seek = gst_app_src_do_seek; basesrc_class->is_seekable = gst_app_src_is_seekable; basesrc_class->get_size = gst_app_src_do_get_size; basesrc_class->query = gst_app_src_query; basesrc_class->event = gst_app_src_event; klass->push_buffer = gst_app_src_push_buffer_action; klass->push_buffer_list = gst_app_src_push_buffer_list_action; klass->push_sample = gst_app_src_push_sample_action; klass->end_of_stream = gst_app_src_end_of_stream; } static void gst_app_src_init (GstAppSrc * appsrc) { GstAppSrcPrivate *priv; priv = appsrc->priv = gst_app_src_get_instance_private (appsrc); g_mutex_init (&priv->mutex); g_cond_init (&priv->cond); priv->queue = gst_queue_array_new (16); priv->wait_status = NOONE_WAITING; priv->size = DEFAULT_PROP_SIZE; priv->duration = DEFAULT_PROP_DURATION; priv->stream_type = DEFAULT_PROP_STREAM_TYPE; priv->max_bytes = DEFAULT_PROP_MAX_BYTES; priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS; priv->max_time = DEFAULT_PROP_MAX_TIME; priv->format = DEFAULT_PROP_FORMAT; priv->block = DEFAULT_PROP_BLOCK; priv->min_latency = DEFAULT_PROP_MIN_LATENCY; priv->max_latency = DEFAULT_PROP_MAX_LATENCY; priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS; priv->min_percent = DEFAULT_PROP_MIN_PERCENT; priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE; priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE; gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE); } /* Must be called with priv->mutex */ static void gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps) { GstMiniObject *obj; GstAppSrcPrivate *priv = src->priv; GstCaps *requeue_caps = NULL; while (!gst_queue_array_is_empty (priv->queue)) { obj = gst_queue_array_pop_head (priv->queue); if (obj) { if (GST_IS_CAPS (obj) && retain_last_caps) { gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj)); } gst_mini_object_unref (obj); } } if (requeue_caps) { gst_queue_array_push_tail (priv->queue, requeue_caps); } priv->queued_bytes = 0; priv->queued_buffers = 0; priv->queued_time = 0; priv->last_in_running_time = GST_CLOCK_TIME_NONE; priv->last_out_running_time = GST_CLOCK_TIME_NONE; priv->need_discont_upstream = FALSE; priv->need_discont_downstream = FALSE; } static void gst_app_src_dispose (GObject * obj) { GstAppSrc *appsrc = GST_APP_SRC_CAST (obj); GstAppSrcPrivate *priv = appsrc->priv; Callbacks *callbacks = NULL; GST_OBJECT_LOCK (appsrc); if (priv->current_caps) { gst_caps_unref (priv->current_caps); priv->current_caps = NULL; } if (priv->last_caps) { gst_caps_unref (priv->last_caps); priv->last_caps = NULL; } GST_OBJECT_UNLOCK (appsrc); g_mutex_lock (&priv->mutex); if (priv->callbacks) callbacks = g_steal_pointer (&priv->callbacks); gst_app_src_flush_queued (appsrc, FALSE); g_mutex_unlock (&priv->mutex); g_clear_pointer (&callbacks, callbacks_unref); G_OBJECT_CLASS (parent_class)->dispose (obj); } static void gst_app_src_finalize (GObject * obj) { GstAppSrc *appsrc = GST_APP_SRC_CAST (obj); GstAppSrcPrivate *priv = appsrc->priv; g_mutex_clear (&priv->mutex); g_cond_clear (&priv->cond); gst_queue_array_free (priv->queue); g_free (priv->uri); G_OBJECT_CLASS (parent_class)->finalize (obj); } static GstCaps * gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter) { GstAppSrc *appsrc = GST_APP_SRC (bsrc); GstCaps *caps; GST_OBJECT_LOCK (appsrc); if ((caps = appsrc->priv->current_caps)) gst_caps_ref (caps); GST_OBJECT_UNLOCK (appsrc); if (filter) { if (caps) { GstCaps *intersection = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST); gst_caps_unref (caps); caps = intersection; } else { caps = gst_caps_ref (filter); } } GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps); return caps; } static void gst_app_src_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstAppSrc *appsrc = GST_APP_SRC_CAST (object); GstAppSrcPrivate *priv = appsrc->priv; switch (prop_id) { case PROP_CAPS: gst_app_src_set_caps (appsrc, gst_value_get_caps (value)); break; case PROP_SIZE: gst_app_src_set_size (appsrc, g_value_get_int64 (value)); break; case PROP_STREAM_TYPE: gst_app_src_set_stream_type (appsrc, g_value_get_enum (value)); break; case PROP_MAX_BYTES: gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value)); break; case PROP_MAX_BUFFERS: gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value)); break; case PROP_MAX_TIME: gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value)); break; case PROP_FORMAT: priv->format = g_value_get_enum (value); break; case PROP_BLOCK: priv->block = g_value_get_boolean (value); break; case PROP_IS_LIVE: gst_base_src_set_live (GST_BASE_SRC (appsrc), g_value_get_boolean (value)); break; case PROP_MIN_LATENCY: gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value), FALSE, -1); break; case PROP_MAX_LATENCY: gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE, g_value_get_int64 (value)); break; case PROP_EMIT_SIGNALS: gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value)); break; case PROP_MIN_PERCENT: priv->min_percent = g_value_get_uint (value); break; case PROP_DURATION: gst_app_src_set_duration (appsrc, g_value_get_uint64 (value)); break; case PROP_HANDLE_SEGMENT_CHANGE: priv->handle_segment_change = g_value_get_boolean (value); break; case PROP_LEAKY_TYPE: priv->leaky_type = g_value_get_enum (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gst_app_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstAppSrc *appsrc = GST_APP_SRC_CAST (object); GstAppSrcPrivate *priv = appsrc->priv; switch (prop_id) { case PROP_CAPS: g_value_take_boxed (value, gst_app_src_get_caps (appsrc)); break; case PROP_SIZE: g_value_set_int64 (value, gst_app_src_get_size (appsrc)); break; case PROP_STREAM_TYPE: g_value_set_enum (value, gst_app_src_get_stream_type (appsrc)); break; case PROP_MAX_BYTES: g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc)); break; case PROP_MAX_BUFFERS: g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc)); break; case PROP_MAX_TIME: g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc)); break; case PROP_FORMAT: g_value_set_enum (value, priv->format); break; case PROP_BLOCK: g_value_set_boolean (value, priv->block); break; case PROP_IS_LIVE: g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc))); break; case PROP_MIN_LATENCY: { guint64 min = 0; gst_app_src_get_latency (appsrc, &min, NULL); g_value_set_int64 (value, min); break; } case PROP_MAX_LATENCY: { guint64 max = 0; gst_app_src_get_latency (appsrc, NULL, &max); g_value_set_int64 (value, max); break; } case PROP_EMIT_SIGNALS: g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc)); break; case PROP_MIN_PERCENT: g_value_set_uint (value, priv->min_percent); break; case PROP_CURRENT_LEVEL_BYTES: g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc)); break; case PROP_CURRENT_LEVEL_BUFFERS: g_value_set_uint64 (value, gst_app_src_get_current_level_buffers (appsrc)); break; case PROP_CURRENT_LEVEL_TIME: g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc)); break; case PROP_DURATION: g_value_set_uint64 (value, gst_app_src_get_duration (appsrc)); break; case PROP_HANDLE_SEGMENT_CHANGE: g_value_set_boolean (value, priv->handle_segment_change); break; case PROP_LEAKY_TYPE: g_value_set_enum (value, priv->leaky_type); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event) { GstAppSrc *appsrc = GST_APP_SRC_CAST (element); GstAppSrcPrivate *priv = appsrc->priv; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_STOP: g_mutex_lock (&priv->mutex); gst_app_src_flush_queued (appsrc, TRUE); g_mutex_unlock (&priv->mutex); break; default: break; } return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element, event), FALSE); } static gboolean gst_app_src_unlock (GstBaseSrc * bsrc) { GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc); GstAppSrcPrivate *priv = appsrc->priv; g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "unlock start"); priv->flushing = TRUE; g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex); return TRUE; } static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc) { GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc); GstAppSrcPrivate *priv = appsrc->priv; g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "unlock stop"); priv->flushing = FALSE; g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex); return TRUE; } static gboolean gst_app_src_start (GstBaseSrc * bsrc) { GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc); GstAppSrcPrivate *priv = appsrc->priv; g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "starting"); priv->started = TRUE; /* set the offset to -1 so that we always do a first seek. This is only used * in random-access mode. */ priv->offset = -1; priv->flushing = FALSE; g_mutex_unlock (&priv->mutex); gst_base_src_set_format (bsrc, priv->format); gst_segment_init (&priv->last_segment, priv->format); gst_segment_init (&priv->current_segment, priv->format); priv->pending_custom_segment = FALSE; return TRUE; } static gboolean gst_app_src_stop (GstBaseSrc * bsrc) { GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc); GstAppSrcPrivate *priv = appsrc->priv; g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "stopping"); priv->is_eos = FALSE; priv->flushing = TRUE; priv->started = FALSE; gst_app_src_flush_queued (appsrc, TRUE); g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex); return TRUE; } static gboolean gst_app_src_is_seekable (GstBaseSrc * src) { GstAppSrc *appsrc = GST_APP_SRC_CAST (src); GstAppSrcPrivate *priv = appsrc->priv; gboolean res = FALSE; switch (priv->stream_type) { case GST_APP_STREAM_TYPE_STREAM: break; case GST_APP_STREAM_TYPE_SEEKABLE: case GST_APP_STREAM_TYPE_RANDOM_ACCESS: res = TRUE; break; } return res; } static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size) { GstAppSrc *appsrc = GST_APP_SRC_CAST (src); *size = gst_app_src_get_size (appsrc); return TRUE; } static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query) { GstAppSrc *appsrc = GST_APP_SRC_CAST (src); GstAppSrcPrivate *priv = appsrc->priv; gboolean res; switch (GST_QUERY_TYPE (query)) { case GST_QUERY_LATENCY: { GstClockTime min, max; gboolean live; /* Query the parent class for the defaults */ res = gst_base_src_query_latency (src, &live, &min, &max); /* overwrite with our values when we need to */ g_mutex_lock (&priv->mutex); if (priv->min_latency != -1) { min = priv->min_latency; max = priv->max_latency; } g_mutex_unlock (&priv->mutex); gst_query_set_latency (query, live, min, max); break; } case GST_QUERY_SCHEDULING: { gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0); gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH); switch (priv->stream_type) { case GST_APP_STREAM_TYPE_STREAM: case GST_APP_STREAM_TYPE_SEEKABLE: break; case GST_APP_STREAM_TYPE_RANDOM_ACCESS: gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL); break; } res = TRUE; break; } case GST_QUERY_DURATION: { GstFormat format; gst_query_parse_duration (query, &format, NULL); if (format == GST_FORMAT_BYTES) { gst_query_set_duration (query, format, priv->size); res = TRUE; } else if (format == GST_FORMAT_TIME) { if (priv->duration != GST_CLOCK_TIME_NONE) { gst_query_set_duration (query, format, priv->duration); res = TRUE; } else { res = FALSE; } } else { res = FALSE; } break; } default: res = GST_BASE_SRC_CLASS (parent_class)->query (src, query); break; } return res; } /* will be called in push mode */ static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment) { GstAppSrc *appsrc = GST_APP_SRC_CAST (src); GstAppSrcPrivate *priv = appsrc->priv; gint64 desired_position; gboolean res = FALSE; gboolean emit; Callbacks *callbacks = NULL; desired_position = segment->position; /* no need to try to seek in streaming mode */ if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM) return TRUE; GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s", desired_position, gst_format_get_name (segment->format)); g_mutex_lock (&priv->mutex); emit = priv->emit_signals; if (priv->callbacks) callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); if (callbacks && callbacks->callbacks.seek_data) { res = callbacks->callbacks.seek_data (appsrc, desired_position, callbacks->user_data); } else if (emit) { g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, desired_position, &res); } g_clear_pointer (&callbacks, callbacks_unref); if (res) { GST_DEBUG_OBJECT (appsrc, "flushing queue"); g_mutex_lock (&priv->mutex); gst_app_src_flush_queued (appsrc, TRUE); gst_segment_copy_into (segment, &priv->last_segment); gst_segment_copy_into (segment, &priv->current_segment); priv->pending_custom_segment = FALSE; g_mutex_unlock (&priv->mutex); priv->is_eos = FALSE; } else { GST_WARNING_OBJECT (appsrc, "seek failed"); } return res; } /* must be called with the appsrc mutex */ static gboolean gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset) { gboolean res = FALSE; gboolean emit; GstAppSrcPrivate *priv = appsrc->priv; Callbacks *callbacks = NULL; emit = priv->emit_signals; if (priv->callbacks) callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT, priv->offset, offset); if (callbacks && callbacks->callbacks.seek_data) res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data); else if (emit) g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, offset, &res); g_clear_pointer (&callbacks, callbacks_unref); g_mutex_lock (&priv->mutex); return res; } /* must be called with the appsrc mutex. After this call things can be * flushing */ static void gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size) { gboolean emit; GstAppSrcPrivate *priv = appsrc->priv; Callbacks *callbacks = NULL; emit = priv->emit_signals; if (priv->callbacks) callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); /* we have no data, we need some. We fire the signal with the size hint. */ if (callbacks && callbacks->callbacks.need_data) callbacks->callbacks.need_data (appsrc, size, callbacks->user_data); else if (emit) g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size, NULL); g_clear_pointer (&callbacks, callbacks_unref); g_mutex_lock (&priv->mutex); /* we can be flushing now because we released the lock */ } /* must be called with the appsrc mutex */ static gboolean gst_app_src_do_negotiate (GstBaseSrc * basesrc) { GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc); GstAppSrcPrivate *priv = appsrc->priv; gboolean result; GstCaps *caps; GST_OBJECT_LOCK (basesrc); caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL; GST_OBJECT_UNLOCK (basesrc); /* Avoid deadlock by unlocking mutex * otherwise we get deadlock between this and stream lock */ g_mutex_unlock (&priv->mutex); if (caps) { result = gst_base_src_set_caps (basesrc, caps); gst_caps_unref (caps); } else { result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc); } g_mutex_lock (&priv->mutex); return result; } static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc) { GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc); GstAppSrcPrivate *priv = appsrc->priv; gboolean result; g_mutex_lock (&priv->mutex); result = gst_app_src_do_negotiate (basesrc); g_mutex_unlock (&priv->mutex); return result; } /* Update the currently queued bytes/buffers/time information for the item * that was just removed from the queue. * * If update_offset is set, additionally the offset of the source will be * moved forward accordingly as if that many bytes were output. */ static void gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item, gboolean update_offset) { GstAppSrcPrivate *priv = appsrc->priv; guint buf_size = 0; guint n_buffers = 0; GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE; if (GST_IS_BUFFER (item)) { GstBuffer *buf = GST_BUFFER_CAST (item); buf_size = gst_buffer_get_size (buf); n_buffers = 1; end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf); if (end_buffer_ts != GST_CLOCK_TIME_NONE && GST_BUFFER_DURATION_IS_VALID (buf)) end_buffer_ts += GST_BUFFER_DURATION (buf); GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size); } else if (GST_IS_BUFFER_LIST (item)) { GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item); guint i; n_buffers = gst_buffer_list_length (buffer_list); for (i = 0; i < n_buffers; i++) { GstBuffer *tmp = gst_buffer_list_get (buffer_list, i); GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp); buf_size += gst_buffer_get_size (tmp); /* Update to the last buffer's timestamp that is known */ if (ts != GST_CLOCK_TIME_NONE) { end_buffer_ts = ts; if (GST_BUFFER_DURATION_IS_VALID (tmp)) end_buffer_ts += GST_BUFFER_DURATION (tmp); } } } priv->queued_bytes -= buf_size; priv->queued_buffers -= n_buffers; /* Update time level if working on a TIME segment */ if (priv->current_segment.format == GST_FORMAT_TIME && end_buffer_ts != GST_CLOCK_TIME_NONE) { /* Clip to the current segment boundaries */ if (priv->current_segment.stop != -1 && end_buffer_ts > priv->current_segment.stop) end_buffer_ts = priv->current_segment.stop; else if (priv->current_segment.start > end_buffer_ts) end_buffer_ts = priv->current_segment.start; priv->last_out_running_time = gst_segment_to_running_time (&priv->current_segment, GST_FORMAT_TIME, end_buffer_ts); GST_TRACE_OBJECT (appsrc, "Last in running time %" GST_TIME_FORMAT ", last out running time %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time), GST_TIME_ARGS (priv->last_out_running_time)); /* If timestamps on both sides are known, calculate the current * fill level in time and consider the queue empty if the output * running time is lower than the input one (i.e. some kind of reset * has happened). */ if (priv->last_out_running_time != GST_CLOCK_TIME_NONE && priv->last_in_running_time != GST_CLOCK_TIME_NONE) { if (priv->last_out_running_time > priv->last_in_running_time) { priv->queued_time = 0; } else { priv->queued_time = priv->last_in_running_time - priv->last_out_running_time; } } } GST_DEBUG_OBJECT (appsrc, "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers, GST_TIME_ARGS (priv->queued_time)); /* only update the offset when in random_access mode and when requested by * the caller, i.e. not when just dropping the item */ if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) priv->offset += buf_size; } /* Update the currently queued bytes/buffers/time information for the item * that was just added to the queue. */ static void gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item) { GstAppSrcPrivate *priv = appsrc->priv; GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE; GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE; guint buf_size = 0; guint n_buffers = 0; if (GST_IS_BUFFER (item)) { GstBuffer *buf = GST_BUFFER_CAST (item); buf_size = gst_buffer_get_size (buf); n_buffers = 1; start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf); if (end_buffer_ts != GST_CLOCK_TIME_NONE && GST_BUFFER_DURATION_IS_VALID (buf)) end_buffer_ts += GST_BUFFER_DURATION (buf); } else if (GST_IS_BUFFER_LIST (item)) { GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item); guint i; n_buffers = gst_buffer_list_length (buffer_list); for (i = 0; i < n_buffers; i++) { GstBuffer *tmp = gst_buffer_list_get (buffer_list, i); GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp); buf_size += gst_buffer_get_size (tmp); if (ts != GST_CLOCK_TIME_NONE) { if (start_buffer_ts == GST_CLOCK_TIME_NONE) start_buffer_ts = ts; end_buffer_ts = ts; if (GST_BUFFER_DURATION_IS_VALID (tmp)) end_buffer_ts += GST_BUFFER_DURATION (tmp); } } } priv->queued_bytes += buf_size; priv->queued_buffers += n_buffers; /* Update time level if working on a TIME segment */ if (priv->last_segment.format == GST_FORMAT_TIME && end_buffer_ts != GST_CLOCK_TIME_NONE) { /* Clip to the last segment boundaries */ if (priv->last_segment.stop != -1 && end_buffer_ts > priv->last_segment.stop) end_buffer_ts = priv->last_segment.stop; else if (priv->last_segment.start > end_buffer_ts) end_buffer_ts = priv->last_segment.start; priv->last_in_running_time = gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME, end_buffer_ts); /* If this is the only buffer then we can directly update the queued time * here. This is especially useful if this was the first buffer because * otherwise we would have to wait until it is actually unqueued to know * the queued duration */ if (gst_queue_array_get_length (priv->queue) == 1) { if (priv->last_segment.stop != -1 && start_buffer_ts > priv->last_segment.stop) start_buffer_ts = priv->last_segment.stop; else if (priv->last_segment.start > start_buffer_ts) start_buffer_ts = priv->last_segment.start; priv->last_out_running_time = gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME, start_buffer_ts); } GST_TRACE_OBJECT (appsrc, "Last in running time %" GST_TIME_FORMAT ", last out running time %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time), GST_TIME_ARGS (priv->last_out_running_time)); if (priv->last_out_running_time != GST_CLOCK_TIME_NONE && priv->last_in_running_time != GST_CLOCK_TIME_NONE) { if (priv->last_out_running_time > priv->last_in_running_time) { priv->queued_time = 0; } else { priv->queued_time = priv->last_in_running_time - priv->last_out_running_time; } } } GST_DEBUG_OBJECT (appsrc, "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers, GST_TIME_ARGS (priv->queued_time)); } static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, GstBuffer ** buf) { GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc); GstAppSrcPrivate *priv = appsrc->priv; GstFlowReturn ret; GST_OBJECT_LOCK (appsrc); if (G_UNLIKELY (priv->size != bsrc->segment.duration && bsrc->segment.format == GST_FORMAT_BYTES)) { GST_DEBUG_OBJECT (appsrc, "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT, bsrc->segment.duration, priv->size); bsrc->segment.duration = priv->size; GST_OBJECT_UNLOCK (appsrc); gst_element_post_message (GST_ELEMENT (appsrc), gst_message_new_duration_changed (GST_OBJECT (appsrc))); } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration && bsrc->segment.format == GST_FORMAT_TIME)) { GST_DEBUG_OBJECT (appsrc, "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration)); bsrc->segment.duration = priv->duration; GST_OBJECT_UNLOCK (appsrc); gst_element_post_message (GST_ELEMENT (appsrc), gst_message_new_duration_changed (GST_OBJECT (appsrc))); } else { GST_OBJECT_UNLOCK (appsrc); } g_mutex_lock (&priv->mutex); /* check flushing first */ if (G_UNLIKELY (priv->flushing)) goto flushing; if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) { /* if we are dealing with a random-access stream, issue a seek if the offset * changed. */ if (G_UNLIKELY (priv->offset != offset)) { gboolean res; /* do the seek */ res = gst_app_src_emit_seek (appsrc, offset); if (G_UNLIKELY (!res)) /* failing to seek is fatal */ goto seek_error; priv->offset = offset; priv->is_eos = FALSE; } } while (TRUE) { /* return data as long as we have some */ if (!gst_queue_array_is_empty (priv->queue)) { GstMiniObject *obj = gst_queue_array_pop_head (priv->queue); if (GST_IS_CAPS (obj)) { GstCaps *next_caps = GST_CAPS (obj); gboolean caps_changed = TRUE; if (next_caps && priv->current_caps) caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps); else caps_changed = (next_caps != priv->current_caps); gst_caps_replace (&priv->current_caps, next_caps); if (next_caps) { gst_caps_unref (next_caps); } if (caps_changed) gst_app_src_do_negotiate (bsrc); /* Lock has released so now may need *- flushing *- new caps change *- check queue has data */ if (G_UNLIKELY (priv->flushing)) goto flushing; /* Continue checks caps and queue */ continue; } if (GST_IS_BUFFER (obj)) { GstBuffer *buffer = GST_BUFFER (obj); /* Mark the buffer as DISCONT if we previously dropped a buffer * instead of outputting it */ if (priv->need_discont_downstream) { buffer = gst_buffer_make_writable (buffer); GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); priv->need_discont_downstream = FALSE; } *buf = buffer; } else if (GST_IS_BUFFER_LIST (obj)) { GstBufferList *buffer_list; buffer_list = GST_BUFFER_LIST (obj); /* Mark the first buffer of the buffer list as DISCONT if we * previously dropped a buffer instead of outputting it */ if (priv->need_discont_downstream) { GstBuffer *buffer; buffer_list = gst_buffer_list_make_writable (buffer_list); buffer = gst_buffer_list_get_writable (buffer_list, 0); GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); priv->need_discont_downstream = FALSE; } gst_base_src_submit_buffer_list (bsrc, buffer_list); *buf = NULL; } else if (GST_IS_EVENT (obj)) { GstEvent *event = GST_EVENT (obj); const GstSegment *segment = NULL; gst_event_parse_segment (event, &segment); g_assert (segment != NULL); if (!gst_segment_is_equal (&priv->current_segment, segment)) { GST_DEBUG_OBJECT (appsrc, "Update new segment %" GST_PTR_FORMAT, event); if (!gst_base_src_new_segment (bsrc, segment)) { GST_ERROR_OBJECT (appsrc, "Couldn't set new segment %" GST_PTR_FORMAT, event); gst_event_unref (event); goto invalid_segment; } gst_segment_copy_into (segment, &priv->current_segment); } gst_event_unref (event); continue; } else { g_assert_not_reached (); } gst_app_src_update_queued_pop (appsrc, obj, TRUE); /* signal that we removed an item */ if ((priv->wait_status & APP_WAITING)) g_cond_broadcast (&priv->cond); /* see if we go lower than the min-percent */ if (priv->min_percent) { if ((priv->max_bytes && priv->queued_bytes * 100 / priv->max_bytes <= priv->min_percent) || (priv->max_buffers && priv->queued_buffers * 100 / priv->max_buffers <= priv->min_percent) || (priv->max_time && priv->queued_time * 100 / priv->max_time <= priv->min_percent)) { /* ignore flushing state, we got a buffer and we will return it now. * Errors will be handled in the next round */ gst_app_src_emit_need_data (appsrc, size); } } ret = GST_FLOW_OK; break; } else { gst_app_src_emit_need_data (appsrc, size); /* we can be flushing now because we released the lock above */ if (G_UNLIKELY (priv->flushing)) goto flushing; /* if we have a buffer now, continue the loop and try to return it. In * random-access mode (where a buffer is normally pushed in the above * signal) we can still be empty because the pushed buffer got flushed or * when the application pushes the requested buffer later, we support both * possibilities. */ if (!gst_queue_array_is_empty (priv->queue)) continue; /* no buffer yet, maybe we are EOS, if not, block for more data. */ } /* check EOS */ if (G_UNLIKELY (priv->is_eos)) goto eos; /* nothing to return, wait a while for new data or flushing. */ priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); priv->wait_status &= ~STREAM_WAITING; } g_mutex_unlock (&priv->mutex); return ret; /* ERRORS */ flushing: { GST_DEBUG_OBJECT (appsrc, "we are flushing"); g_mutex_unlock (&priv->mutex); return GST_FLOW_FLUSHING; } eos: { GST_DEBUG_OBJECT (appsrc, "we are EOS"); g_mutex_unlock (&priv->mutex); return GST_FLOW_EOS; } seek_error: { g_mutex_unlock (&priv->mutex); GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"), GST_ERROR_SYSTEM); return GST_FLOW_ERROR; } invalid_segment: { g_mutex_unlock (&priv->mutex); GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS, (NULL), ("Failed to configure the provided input segment.")); return GST_FLOW_ERROR; } } /* external API */ /** * gst_app_src_set_caps: * @appsrc: a #GstAppSrc * @caps: (nullable): caps to set * * Set the capabilities on the appsrc element. This function takes * a copy of the caps structure. After calling this method, the source will * only produce caps that match @caps. @caps must be fixed and the caps on the * buffers must match the caps or left NULL. */ void gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps) { GstAppSrcPrivate *priv; gboolean caps_changed; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; g_mutex_lock (&priv->mutex); GST_OBJECT_LOCK (appsrc); if (caps && priv->last_caps) caps_changed = !gst_caps_is_equal (caps, priv->last_caps); else caps_changed = (caps != priv->last_caps); if (caps_changed) { GstCaps *new_caps; gpointer t; new_caps = caps ? gst_caps_copy (caps) : NULL; GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps); while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) { gst_caps_unref (gst_queue_array_pop_tail (priv->queue)); } gst_queue_array_push_tail (priv->queue, new_caps); gst_caps_replace (&priv->last_caps, new_caps); if ((priv->wait_status & STREAM_WAITING)) g_cond_broadcast (&priv->cond); } GST_OBJECT_UNLOCK (appsrc); g_mutex_unlock (&priv->mutex); } /** * gst_app_src_get_caps: * @appsrc: a #GstAppSrc * * Get the configured caps on @appsrc. * * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage. */ GstCaps * gst_app_src_get_caps (GstAppSrc * appsrc) { GstCaps *caps; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL); GST_OBJECT_LOCK (appsrc); if ((caps = appsrc->priv->last_caps)) gst_caps_ref (caps); GST_OBJECT_UNLOCK (appsrc); return caps; } /** * gst_app_src_set_size: * @appsrc: a #GstAppSrc * @size: the size to set * * Set the size of the stream in bytes. A value of -1 means that the size is * not known. */ void gst_app_src_set_size (GstAppSrc * appsrc, gint64 size) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size); priv->size = size; GST_OBJECT_UNLOCK (appsrc); } /** * gst_app_src_get_size: * @appsrc: a #GstAppSrc * * Get the size of the stream in bytes. A value of -1 means that the size is * not known. * * Returns: the size of the stream previously set with gst_app_src_set_size(); */ gint64 gst_app_src_get_size (GstAppSrc * appsrc) { gint64 size; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); size = priv->size; GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size); GST_OBJECT_UNLOCK (appsrc); return size; } /** * gst_app_src_set_duration: * @appsrc: a #GstAppSrc * @duration: the duration to set * * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is * not known. * * Since: 1.10 */ void gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT, GST_TIME_ARGS (duration)); priv->duration = duration; GST_OBJECT_UNLOCK (appsrc); } /** * gst_app_src_get_duration: * @appsrc: a #GstAppSrc * * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is * not known. * * Returns: the duration of the stream previously set with gst_app_src_set_duration(); * * Since: 1.10 */ GstClockTime gst_app_src_get_duration (GstAppSrc * appsrc) { GstClockTime duration; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); duration = priv->duration; GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT, GST_TIME_ARGS (duration)); GST_OBJECT_UNLOCK (appsrc); return duration; } /** * gst_app_src_set_stream_type: * @appsrc: a #GstAppSrc * @type: the new state * * Set the stream type on @appsrc. For seekable streams, the "seek" signal must * be connected to. * * A stream_type stream */ void gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type); priv->stream_type = type; GST_OBJECT_UNLOCK (appsrc); } /** * gst_app_src_get_stream_type: * @appsrc: a #GstAppSrc * * Get the stream type. Control the stream type of @appsrc * with gst_app_src_set_stream_type(). * * Returns: the stream type. */ GstAppStreamType gst_app_src_get_stream_type (GstAppSrc * appsrc) { gboolean stream_type; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); stream_type = priv->stream_type; GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type); GST_OBJECT_UNLOCK (appsrc); return stream_type; } /** * gst_app_src_set_max_bytes: * @appsrc: a #GstAppSrc * @max: the maximum number of bytes to queue * * Set the maximum amount of bytes that can be queued in @appsrc. * After the maximum amount of bytes are queued, @appsrc will emit the * "enough-data" signal. */ void gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; g_mutex_lock (&priv->mutex); if (max != priv->max_bytes) { GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max); priv->max_bytes = max; /* signal the change */ g_cond_broadcast (&priv->cond); } g_mutex_unlock (&priv->mutex); } /** * gst_app_src_get_max_bytes: * @appsrc: a #GstAppSrc * * Get the maximum amount of bytes that can be queued in @appsrc. * * Returns: The maximum amount of bytes that can be queued. */ guint64 gst_app_src_get_max_bytes (GstAppSrc * appsrc) { guint64 result; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); priv = appsrc->priv; g_mutex_lock (&priv->mutex); result = priv->max_bytes; GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result); g_mutex_unlock (&priv->mutex); return result; } /** * gst_app_src_get_current_level_bytes: * @appsrc: a #GstAppSrc * * Get the number of currently queued bytes inside @appsrc. * * Returns: The number of currently queued bytes. * * Since: 1.2 */ guint64 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc) { guint64 queued; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); queued = priv->queued_bytes; GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT, queued); GST_OBJECT_UNLOCK (appsrc); return queued; } /** * gst_app_src_set_max_buffers: * @appsrc: a #GstAppSrc * @max: the maximum number of buffers to queue * * Set the maximum amount of buffers that can be queued in @appsrc. * After the maximum amount of buffers are queued, @appsrc will emit the * "enough-data" signal. * * Since: 1.20 */ void gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; g_mutex_lock (&priv->mutex); if (max != priv->max_buffers) { GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max); priv->max_buffers = max; /* signal the change */ g_cond_broadcast (&priv->cond); } g_mutex_unlock (&priv->mutex); } /** * gst_app_src_get_max_buffers: * @appsrc: a #GstAppSrc * * Get the maximum amount of buffers that can be queued in @appsrc. * * Returns: The maximum amount of buffers that can be queued. * * Since: 1.20 */ guint64 gst_app_src_get_max_buffers (GstAppSrc * appsrc) { guint64 result; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); priv = appsrc->priv; g_mutex_lock (&priv->mutex); result = priv->max_buffers; GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT, result); g_mutex_unlock (&priv->mutex); return result; } /** * gst_app_src_get_current_level_buffers: * @appsrc: a #GstAppSrc * * Get the number of currently queued buffers inside @appsrc. * * Returns: The number of currently queued buffers. * * Since: 1.20 */ guint64 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc) { guint64 queued; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); queued = priv->queued_buffers; GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT, queued); GST_OBJECT_UNLOCK (appsrc); return queued; } /** * gst_app_src_set_max_time: * @appsrc: a #GstAppSrc * @max: the maximum amonut of time to queue * * Set the maximum amount of time that can be queued in @appsrc. * After the maximum amount of time are queued, @appsrc will emit the * "enough-data" signal. * * Since: 1.20 */ void gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; g_mutex_lock (&priv->mutex); if (max != priv->max_time) { GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT, GST_TIME_ARGS (max)); priv->max_time = max; /* signal the change */ g_cond_broadcast (&priv->cond); } g_mutex_unlock (&priv->mutex); } /** * gst_app_src_get_max_time: * @appsrc: a #GstAppSrc * * Get the maximum amount of time that can be queued in @appsrc. * * Returns: The maximum amount of time that can be queued. * * Since: 1.20 */ GstClockTime gst_app_src_get_max_time (GstAppSrc * appsrc) { GstClockTime result; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0); priv = appsrc->priv; g_mutex_lock (&priv->mutex); result = priv->max_time; GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT, GST_TIME_ARGS (result)); g_mutex_unlock (&priv->mutex); return result; } /** * gst_app_src_get_current_level_time: * @appsrc: a #GstAppSrc * * Get the amount of currently queued time inside @appsrc. * * Returns: The amount of currently queued time. * * Since: 1.20 */ GstClockTime gst_app_src_get_current_level_time (GstAppSrc * appsrc) { gint64 queued; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE); priv = appsrc->priv; GST_OBJECT_LOCK (appsrc); queued = priv->queued_time; GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT, GST_TIME_ARGS (queued)); GST_OBJECT_UNLOCK (appsrc); return queued; } static void gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min, gboolean do_max, guint64 max) { GstAppSrcPrivate *priv = appsrc->priv; gboolean changed = FALSE; g_mutex_lock (&priv->mutex); if (do_min && priv->min_latency != min) { priv->min_latency = min; changed = TRUE; } if (do_max && priv->max_latency != max) { priv->max_latency = max; changed = TRUE; } g_mutex_unlock (&priv->mutex); if (changed) { GST_DEBUG_OBJECT (appsrc, "posting latency changed"); gst_element_post_message (GST_ELEMENT_CAST (appsrc), gst_message_new_latency (GST_OBJECT_CAST (appsrc))); } } /** * gst_app_src_set_leaky_type: * @appsrc: a #GstAppSrc * @leaky: the #GstAppLeakyType * * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc * will drop any buffers that are pushed into it once its internal queue is * full. The selected type defines whether to drop the oldest or new * buffers. * * Since: 1.20 */ void gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky) { g_return_if_fail (GST_IS_APP_SRC (appsrc)); appsrc->priv->leaky_type = leaky; } /** * gst_app_src_get_leaky_type: * @appsrc: a #GstAppSrc * * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type() * for more details. * * Returns: The currently set #GstAppLeakyType. * * Since: 1.20 */ GstAppLeakyType gst_app_src_get_leaky_type (GstAppSrc * appsrc) { g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE); return appsrc->priv->leaky_type; } /** * gst_app_src_set_latency: * @appsrc: a #GstAppSrc * @min: the min latency * @max: the max latency * * Configure the @min and @max latency in @src. If @min is set to -1, the * default latency calculations for pseudo-live sources will be used. */ void gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max) { gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max); } /** * gst_app_src_get_latency: * @appsrc: a #GstAppSrc * @min: (out): the min latency * @max: (out): the max latency * * Retrieve the min and max latencies in @min and @max respectively. */ void gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; g_mutex_lock (&priv->mutex); if (min) *min = priv->min_latency; if (max) *max = priv->max_latency; g_mutex_unlock (&priv->mutex); } /** * gst_app_src_set_emit_signals: * @appsrc: a #GstAppSrc * @emit: the new state * * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is * by default disabled because signal emission is expensive and unneeded when * the application prefers to operate in pull mode. */ void gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit) { GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); priv = appsrc->priv; g_mutex_lock (&priv->mutex); priv->emit_signals = emit; g_mutex_unlock (&priv->mutex); } /** * gst_app_src_get_emit_signals: * @appsrc: a #GstAppSrc * * Check if appsrc will emit the "new-preroll" and "new-buffer" signals. * * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer" * signals. */ gboolean gst_app_src_get_emit_signals (GstAppSrc * appsrc) { gboolean result; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE); priv = appsrc->priv; g_mutex_lock (&priv->mutex); result = priv->emit_signals; g_mutex_unlock (&priv->mutex); return result; } static GstFlowReturn gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, GstBufferList * buflist, gboolean steal_ref) { gboolean first = TRUE; GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR); priv = appsrc->priv; if (buffer != NULL) g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); else g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR); if (buflist != NULL) { if (gst_buffer_list_length (buflist) == 0) return GST_FLOW_OK; buffer = gst_buffer_list_get (buflist, 0); } if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE && GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE && gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) { GstClock *clock; clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc)); if (clock) { GstClockTime now; GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (appsrc)); now = gst_clock_get_time (clock); if (now > base_time) now -= base_time; else now = 0; gst_object_unref (clock); if (buflist == NULL) { if (!steal_ref) { buffer = gst_buffer_copy (buffer); steal_ref = TRUE; } else { buffer = gst_buffer_make_writable (buffer); } } else { if (!steal_ref) { buflist = gst_buffer_list_copy (buflist); steal_ref = TRUE; } else { buflist = gst_buffer_list_make_writable (buflist); } buffer = gst_buffer_list_get_writable (buflist, 0); } GST_BUFFER_PTS (buffer) = now; GST_BUFFER_DTS (buffer) = now; } else { GST_WARNING_OBJECT (appsrc, "do-timestamp=TRUE but buffers are provided before " "reaching the PLAYING state and having a clock. Timestamps will " "not be accurate!"); } } g_mutex_lock (&priv->mutex); while (TRUE) { /* can't accept buffers when we are flushing or EOS */ if (priv->flushing) goto flushing; if (priv->is_eos) goto eos; if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) || (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) || (priv->max_time && priv->queued_time >= priv->max_time)) { GST_DEBUG_OBJECT (appsrc, "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %" G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %" GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)", priv->queued_bytes, priv->max_bytes, priv->queued_buffers, priv->max_buffers, GST_TIME_ARGS (priv->queued_time), GST_TIME_ARGS (priv->max_time)); if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) { priv->need_discont_upstream = TRUE; goto dropped; } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) { guint i, length = gst_queue_array_get_length (priv->queue); GstMiniObject *item = NULL; /* Find the oldest buffer or buffer list and drop it, then update the * limits. Dropping one is sufficient to go below the limits again. */ for (i = 0; i < length; i++) { item = gst_queue_array_peek_nth (priv->queue, i); if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) { gst_queue_array_drop_element (priv->queue, i); break; } /* To not accidentally have an event after the loop */ item = NULL; } if (!item) { GST_FIXME_OBJECT (appsrc, "No buffer or buffer list queued but queue is full"); /* This shouldn't really happen but in this case we can't really do * anything apart from accepting the buffer / bufferlist */ break; } GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item); gst_app_src_update_queued_pop (appsrc, item, FALSE); gst_mini_object_unref (item); priv->need_discont_downstream = TRUE; continue; } if (first) { Callbacks *callbacks = NULL; gboolean emit; emit = priv->emit_signals; if (priv->callbacks) callbacks = callbacks_ref (priv->callbacks); /* only signal on the first push */ g_mutex_unlock (&priv->mutex); if (callbacks && callbacks->callbacks.enough_data) callbacks->callbacks.enough_data (appsrc, callbacks->user_data); else if (emit) g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, NULL); g_clear_pointer (&callbacks, callbacks_unref); g_mutex_lock (&priv->mutex); /* continue to check for flushing/eos after releasing the lock */ first = FALSE; continue; } if (priv->block) { GST_DEBUG_OBJECT (appsrc, "waiting for free space"); /* we are filled, wait until a buffer gets popped or when we * flush. */ priv->wait_status |= APP_WAITING; g_cond_wait (&priv->cond, &priv->mutex); priv->wait_status &= ~APP_WAITING; } else { /* no need to wait for free space, we just pump more data into the * queue hoping that the caller reacts to the enough-data signal and * stops pushing buffers. */ break; } } else { break; } } if (priv->pending_custom_segment) { GstEvent *event = gst_event_new_segment (&priv->last_segment); GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event); gst_queue_array_push_tail (priv->queue, event); priv->pending_custom_segment = FALSE; } if (buflist != NULL) { /* Mark the first buffer of the buffer list as DISCONT if we previously * dropped a buffer instead of queueing it */ if (priv->need_discont_upstream) { if (!steal_ref) { buflist = gst_buffer_list_copy (buflist); steal_ref = TRUE; } else { buflist = gst_buffer_list_make_writable (buflist); } buffer = gst_buffer_list_get_writable (buflist, 0); GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); priv->need_discont_upstream = FALSE; } GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist); if (!steal_ref) gst_buffer_list_ref (buflist); gst_queue_array_push_tail (priv->queue, buflist); } else { /* Mark the buffer as DISCONT if we previously dropped a buffer instead of * queueing it */ if (priv->need_discont_upstream) { if (!steal_ref) { buffer = gst_buffer_copy (buffer); steal_ref = TRUE; } else { buffer = gst_buffer_make_writable (buffer); } GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); priv->need_discont_upstream = FALSE; } GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer); if (!steal_ref) gst_buffer_ref (buffer); gst_queue_array_push_tail (priv->queue, buffer); } gst_app_src_update_queued_push (appsrc, buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer)); if ((priv->wait_status & STREAM_WAITING)) g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex); return GST_FLOW_OK; /* ERRORS */ flushing: { GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer); if (steal_ref) { if (buflist) gst_buffer_list_unref (buflist); else gst_buffer_unref (buffer); } g_mutex_unlock (&priv->mutex); return GST_FLOW_FLUSHING; } eos: { GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer); if (steal_ref) { if (buflist) gst_buffer_list_unref (buflist); else gst_buffer_unref (buffer); } g_mutex_unlock (&priv->mutex); return GST_FLOW_EOS; } dropped: { GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer); if (steal_ref) { if (buflist) gst_buffer_list_unref (buflist); else gst_buffer_unref (buffer); } g_mutex_unlock (&priv->mutex); return GST_FLOW_EOS; } } static GstFlowReturn gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer, gboolean steal_ref) { return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref); } static GstFlowReturn gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample) { GstAppSrcPrivate *priv = appsrc->priv; GstBufferList *buffer_list; GstBuffer *buffer; GstCaps *caps; g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR); caps = gst_sample_get_caps (sample); if (caps != NULL) { gst_app_src_set_caps (appsrc, caps); } else { GST_WARNING_OBJECT (appsrc, "received sample without caps"); } if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) { GstSegment *segment = gst_sample_get_segment (sample); if (segment->format != GST_FORMAT_TIME) { GST_LOG_OBJECT (appsrc, "format %s is not supported", gst_format_get_name (segment->format)); goto handle_buffer; } g_mutex_lock (&priv->mutex); if (gst_segment_is_equal (&priv->last_segment, segment)) { GST_LOG_OBJECT (appsrc, "segment wasn't changed"); g_mutex_unlock (&priv->mutex); goto handle_buffer; } /* will be pushed to queue with next buffer/buffer-list */ gst_segment_copy_into (segment, &priv->last_segment); priv->pending_custom_segment = TRUE; g_mutex_unlock (&priv->mutex); } handle_buffer: buffer = gst_sample_get_buffer (sample); if (buffer != NULL) return gst_app_src_push_buffer_full (appsrc, buffer, FALSE); buffer_list = gst_sample_get_buffer_list (sample); if (buffer_list != NULL) return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE); GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list"); return GST_FLOW_OK; } /** * gst_app_src_push_buffer: * @appsrc: a #GstAppSrc * @buffer: (transfer full): a #GstBuffer to push * * Adds a buffer to the queue of buffers that the appsrc element will * push to its source pad. This function takes ownership of the buffer. * * When the block property is TRUE, this function can block until free * space becomes available in the queue. * * Returns: #GST_FLOW_OK when the buffer was successfully queued. * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING. * #GST_FLOW_EOS when EOS occurred. */ GstFlowReturn gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer) { return gst_app_src_push_buffer_full (appsrc, buffer, TRUE); } /** * gst_app_src_push_buffer_list: * @appsrc: a #GstAppSrc * @buffer_list: (transfer full): a #GstBufferList to push * * Adds a buffer list to the queue of buffers and buffer lists that the * appsrc element will push to its source pad. This function takes ownership * of @buffer_list. * * When the block property is TRUE, this function can block until free * space becomes available in the queue. * * Returns: #GST_FLOW_OK when the buffer list was successfully queued. * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING. * #GST_FLOW_EOS when EOS occurred. * * Since: 1.14 */ GstFlowReturn gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list) { return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE); } /** * gst_app_src_push_sample: * @appsrc: a #GstAppSrc * @sample: (transfer none): a #GstSample from which buffer and caps may be * extracted * * Extract a buffer from the provided sample and adds it to the queue of * buffers that the appsrc element will push to its source pad. Any * previous caps that were set on appsrc will be replaced by the caps * associated with the sample if not equal. * * This function does not take ownership of the * sample so the sample needs to be unreffed after calling this function. * * When the block property is TRUE, this function can block until free * space becomes available in the queue. * * Returns: #GST_FLOW_OK when the buffer was successfully queued. * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING. * #GST_FLOW_EOS when EOS occurred. * * Since: 1.6 * */ GstFlowReturn gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample) { return gst_app_src_push_sample_internal (appsrc, sample); } /* push a buffer without stealing the ref of the buffer. This is used for the * action signal. */ static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer) { return gst_app_src_push_buffer_full (appsrc, buffer, FALSE); } /* push a buffer list without stealing the ref of the buffer list. This is * used for the action signal. */ static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc, GstBufferList * buffer_list) { return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE); } /* push a sample without stealing the ref. This is used for the * action signal. */ static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample) { return gst_app_src_push_sample_internal (appsrc, sample); } /** * gst_app_src_end_of_stream: * @appsrc: a #GstAppSrc * * Indicates to the appsrc element that the last buffer queued in the * element is the last buffer of the stream. * * Returns: #GST_FLOW_OK when the EOS was successfully queued. * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING. */ GstFlowReturn gst_app_src_end_of_stream (GstAppSrc * appsrc) { GstAppSrcPrivate *priv; g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR); priv = appsrc->priv; g_mutex_lock (&priv->mutex); /* can't accept buffers when we are flushing. We can accept them when we are * EOS although it will not do anything. */ if (priv->flushing) goto flushing; GST_DEBUG_OBJECT (appsrc, "sending EOS"); priv->is_eos = TRUE; g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex); return GST_FLOW_OK; /* ERRORS */ flushing: { g_mutex_unlock (&priv->mutex); GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing"); return GST_FLOW_FLUSHING; } } /** * gst_app_src_set_callbacks: (skip) * @appsrc: a #GstAppSrc * @callbacks: the callbacks * @user_data: a user_data argument for the callbacks * @notify: a destroy notify function * * Set callbacks which will be executed when data is needed, enough data has * been collected or when a seek should be performed. * This is an alternative to using the signals, it has lower overhead and is thus * less expensive, but also less flexible. * * If callbacks are installed, no signals will be emitted for performance * reasons. * * Before 1.16.3 it was not possible to change the callbacks in a thread-safe * way. */ void gst_app_src_set_callbacks (GstAppSrc * appsrc, GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify) { Callbacks *old_callbacks, *new_callbacks = NULL; GstAppSrcPrivate *priv; g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_return_if_fail (callbacks != NULL); priv = appsrc->priv; if (callbacks) { new_callbacks = g_new0 (Callbacks, 1); new_callbacks->callbacks = *callbacks; new_callbacks->user_data = user_data; new_callbacks->destroy_notify = notify; new_callbacks->ref_count = 1; } g_mutex_lock (&priv->mutex); old_callbacks = g_steal_pointer (&priv->callbacks); priv->callbacks = g_steal_pointer (&new_callbacks); g_mutex_unlock (&priv->mutex); g_clear_pointer (&old_callbacks, callbacks_unref); } /*** GSTURIHANDLER INTERFACE *************************************************/ static GstURIType gst_app_src_uri_get_type (GType type) { return GST_URI_SRC; } static const gchar *const * gst_app_src_uri_get_protocols (GType type) { static const gchar *protocols[] = { "appsrc", NULL }; return protocols; } static gchar * gst_app_src_uri_get_uri (GstURIHandler * handler) { GstAppSrc *appsrc = GST_APP_SRC (handler); return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL; } static gboolean gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, GError ** error) { GstAppSrc *appsrc = GST_APP_SRC (handler); g_free (appsrc->priv->uri); appsrc->priv->uri = uri ? g_strdup (uri) : NULL; return TRUE; } static void gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data) { GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; iface->get_type = gst_app_src_uri_get_type; iface->get_protocols = gst_app_src_uri_get_protocols; iface->get_uri = gst_app_src_uri_get_uri; iface->set_uri = gst_app_src_uri_set_uri; } static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event) { GstAppSrc *appsrc = GST_APP_SRC_CAST (src); GstAppSrcPrivate *priv = appsrc->priv; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_STOP: g_mutex_lock (&priv->mutex); priv->is_eos = FALSE; g_mutex_unlock (&priv->mutex); break; default: break; } return GST_BASE_SRC_CLASS (parent_class)->event (src, event); }