diff --git a/docs/libs/gstreamer-libs-sections.txt b/docs/libs/gstreamer-libs-sections.txt index 7a811e71c1..d3b357ef25 100644 --- a/docs/libs/gstreamer-libs-sections.txt +++ b/docs/libs/gstreamer-libs-sections.txt @@ -131,6 +131,7 @@ GstBaseSrcFlags gst_base_src_is_live gst_base_src_set_live +gst_base_src_set_format GST_BASE_SRC_PAD @@ -325,6 +326,12 @@ THREAD_TEST_RUNNING GST_START_TEST GST_END_TEST +sync_cond +mutex +GST_CAT_DEFAULT +thread_list +start_cond + fail_unless_equals_int fail_unless_equals_string fail_unless_equals_uint64 diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 44f19329d1..179a108bae 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -23,21 +23,68 @@ /** * SECTION:gstbasesrc * @short_description: Base class for getrange based source elements - * @see_also: #GstBaseTransform, #GstBaseSink - * - * This class is mostly useful for elements that do byte based - * access to a random access resource, like files. - * If random access is not possible, the live-mode should be set - * to TRUE. + * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink * + * This is a generice base class for source elements. The following + * types of sources are supported: * - * one source pad - * handles state changes - * does flushing - * preroll with optional preview - * pull/push mode - * EOS handling + * random access sources like files + * seekable sources + * live sources * + * + * The source can be configured to operate in a any #GstFormat with the + * gst_base_src_set_format(). This format determines the format of the + * internal #GstSegment and the #GST_EVENT_NEW_SEGMENT. The default format for + * #GstBaseSrc is GST_FORMAT_BYTES. + * + * #GstBaseSrc always supports the push mode scheduling. If the following + * conditions are met, it also supports pull mode scheduling: + * + * the format is set to GST_FORMAT_BYTES (default). + * #GstBaseSrc::is_seekable returns TRUE. + * + * + * If all the conditions are met for operating in pull mode, #GstBaseSrc is + * automatically seekable in push mode as well. The following conditions must be + * met to make the element seekable in push mode when the format is not + * GST_FORMAT_BYTES: + * + * + * #GstBaseSrc::is_seekable returns TRUE. + * + * + * #GstBaseSrc::query can convert all supported seek formats to the + * internal format as set with gst_base_src_set_format(). + * + * + * #GstBaseSrc::do_seek is implemented, performs the seek and returns TRUE. + * + * + * + * When the default format is not GST_FORMAT_BYTES, the element should ignore the + * offset and length in the #GstBaseSrc::create method. It is recommended to subclass + * #GstPushSrc instead in this situation. + * + * #GstBaseSrc has support for live sources. Live sources are sources that produce + * data at a fixed rate, such as audio or video capture. A typical live source also + * provides a clock to export the rate at which they produce data. + * Use gst_base_src_set_live() to activate the live source mode. + * + * A live source does not produce data in the PAUSED state, this means that the + * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING. + * To signal the pipeline that the element will not produce data, its return + * value from the READY to PAUSED state will be GST_STATE_CHANGE_NO_PREROLL. + * + * A typical live source will timestamp the buffers they create with the current + * stream time of the pipeline. This is why they can only produce data in PLAYING, + * when the clock is actually distributed and running. + * + * The #GstBaseSrc::get_times method can be used to implement pseudo-live sources. + * The base source will wait for the specified stream time returned in + * #GstBaseSrc::get_times before pushing out the buffer. + * + * Last reviewed on 2005-12-10 (0.10.0) */ #include @@ -52,9 +99,6 @@ #include #include -#define DEFAULT_BLOCKSIZE 4096 -#define DEFAULT_NUM_BUFFERS -1 - GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug); #define GST_CAT_DEFAULT gst_base_src_debug @@ -76,11 +120,16 @@ enum LAST_SIGNAL }; +#define DEFAULT_BLOCKSIZE 4096 +#define DEFAULT_NUM_BUFFERS -1 +#define DEFAULT_TYPEFIND FALSE + enum { PROP_0, PROP_BLOCKSIZE, PROP_NUM_BUFFERS, + PROP_TYPEFIND, }; static GstElementClass *parent_class = NULL; @@ -125,14 +174,16 @@ static void gst_base_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event); static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event); +static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event); static gboolean gst_base_src_query (GstPad * pad, GstQuery * query); static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc); -static gboolean gst_base_src_default_newsegment (GstBaseSrc * src); +static gboolean gst_base_src_default_do_seek (GstBaseSrc * src, + GstSegment * segment); +static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query); static gboolean gst_base_src_unlock (GstBaseSrc * basesrc); -static gboolean gst_base_src_get_size (GstBaseSrc * basesrc, guint64 * size); static gboolean gst_base_src_start (GstBaseSrc * basesrc); static gboolean gst_base_src_stop (GstBaseSrc * basesrc); @@ -171,18 +222,23 @@ gst_base_src_class_init (GstBaseSrcClass * klass) g_param_spec_ulong ("blocksize", "Block size", "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_NUM_BUFFERS, g_param_spec_int ("num-buffers", "num-buffers", "Number of buffers to output before sending EOS", -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TYPEFIND, + g_param_spec_boolean ("typefind", "Typefind", + "Run typefind before negotiating", DEFAULT_TYPEFIND, + G_PARAM_READWRITE)); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_base_src_change_state); gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event); - klass->negotiate = gst_base_src_default_negotiate; - klass->newsegment = gst_base_src_default_newsegment; + klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate); + klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event); + klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek); + klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query); } static void @@ -229,7 +285,9 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class) basesrc->blocksize = DEFAULT_BLOCKSIZE; basesrc->clock_id = NULL; - gst_segment_init (&basesrc->segment, GST_FORMAT_BYTES); + /* we operate in BYTES by default */ + gst_base_src_set_format (basesrc, GST_FORMAT_BYTES); + basesrc->ABI.typefind = DEFAULT_TYPEFIND; GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); @@ -285,6 +343,23 @@ gst_base_src_is_live (GstBaseSrc * src) return result; } +/** + * gst_base_src_set_format: + * @src: base source instance + * @format: the format to use + * + * Sets the default format of the source. This will be the format used + * for sending NEW_SEGMENT events and for performing seeks. + * + * If a format of GST_FORMAT_BYTES is set, the element will be able to + * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE. + */ +void +gst_base_src_set_format (GstBaseSrc * src, GstFormat format) +{ + gst_segment_init (&src->segment, format); +} + static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps) { @@ -326,13 +401,10 @@ gst_base_src_getcaps (GstPad * pad) } static gboolean -gst_base_src_query (GstPad * pad, GstQuery * query) +gst_base_src_default_query (GstBaseSrc * src, GstQuery * query) { - GstBaseSrc *src; gboolean res; - src = GST_BASE_SRC (gst_pad_get_parent (pad)); - switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION: { @@ -340,31 +412,45 @@ gst_base_src_query (GstPad * pad, GstQuery * query) gst_query_parse_position (query, &format, NULL); switch (format) { - case GST_FORMAT_DEFAULT: - case GST_FORMAT_BYTES: - gst_query_set_position (query, GST_FORMAT_BYTES, src->offset); - res = TRUE; - break; case GST_FORMAT_PERCENT: { - gboolean b; - gint64 i64; - guint64 ui64; + gint64 percent; + gint64 position; + gint64 duration; - b = gst_base_src_get_size (src, &ui64); - if (b && src->offset < ui64) - i64 = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, src->offset, - ui64); - else - i64 = GST_FORMAT_PERCENT_MAX; + position = src->segment.last_stop; + duration = src->segment.duration; - gst_query_set_position (query, GST_FORMAT_PERCENT, i64); + if (position != -1 && duration != -1) { + if (position < duration) + percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position, + duration); + else + percent = GST_FORMAT_PERCENT_MAX; + } else + percent = -1; + + gst_query_set_position (query, GST_FORMAT_PERCENT, percent); res = TRUE; break; } default: - res = FALSE; + { + gint64 position; + + position = src->segment.last_stop; + + if (position != -1) { + /* convert to requested format */ + res = + gst_pad_query_convert (src->srcpad, src->segment.format, + position, &format, &position); + } else + res = TRUE; + + gst_query_set_position (query, format, position); break; + } } break; } @@ -374,133 +460,151 @@ gst_base_src_query (GstPad * pad, GstQuery * query) gst_query_parse_duration (query, &format, NULL); switch (format) { - case GST_FORMAT_DEFAULT: - case GST_FORMAT_BYTES: - { - gboolean b; - gint64 i64; - guint64 ui64; - - b = gst_base_src_get_size (src, &ui64); - /* better to make get_size take an int64 */ - i64 = b ? (gint64) ui64 : -1; - gst_query_set_duration (query, GST_FORMAT_BYTES, i64); - res = TRUE; - break; - } case GST_FORMAT_PERCENT: gst_query_set_duration (query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX); res = TRUE; break; default: - res = FALSE; + { + gint64 duration; + + duration = src->segment.duration; + + if (duration != -1) { + /* convert to requested format */ + res = + gst_pad_query_convert (src->srcpad, src->segment.format, + duration, &format, &duration); + } else { + res = TRUE; + } + gst_query_set_duration (query, format, duration); break; + } } break; } case GST_QUERY_SEEKING: - gst_query_set_seeking (query, GST_FORMAT_BYTES, - src->seekable, 0, src->size); + { + gst_query_set_seeking (query, src->segment.format, + src->seekable, 0, src->segment.duration); res = TRUE; break; - + } case GST_QUERY_SEGMENT: { gint64 start, stop; - start = src->segment.start; - /* no end segment configured, current size then */ + /* no end segment configured, current duration then */ if ((stop = src->segment.stop) == -1) - stop = src->size; + stop = src->segment.duration; + start = src->segment.start; - /* FIXME, we can't report our rate as we did not store it, d'oh!. - * Also, subclasses might want to support other formats. */ - gst_query_set_segment (query, 1.0, GST_FORMAT_BYTES, start, stop); + /* adjust to stream time */ + if (src->segment.time != -1) { + start -= src->segment.time; + if (stop != -1) + stop -= src->segment.time; + } + gst_query_set_segment (query, src->segment.rate, src->segment.format, + start, stop); res = TRUE; break; } case GST_QUERY_FORMATS: + { gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT, GST_FORMAT_BYTES, GST_FORMAT_PERCENT); res = TRUE; break; - + } case GST_QUERY_LATENCY: case GST_QUERY_JITTER: case GST_QUERY_RATE: case GST_QUERY_CONVERT: + { + GstFormat src_fmt, dest_fmt; + gint64 src_val, dest_val; + + gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val); + + /* we can only convert between equal formats... */ + if (src_fmt == dest_fmt) { + dest_val = src_val; + res = TRUE; + } else + res = FALSE; + + gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val); + break; + } default: - res = gst_pad_query_default (pad, query); + res = FALSE; break; } - - gst_object_unref (src); return res; } static gboolean -gst_base_src_default_newsegment (GstBaseSrc * src) +gst_base_src_query (GstPad * pad, GstQuery * query) { - GstEvent *event; + GstBaseSrc *src; + GstBaseSrcClass *bclass; + gboolean result = FALSE; - GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT - " to %" G_GINT64_FORMAT, src->segment.start, src->segment.stop); + src = GST_BASE_SRC (gst_pad_get_parent (pad)); - event = gst_event_new_new_segment (FALSE, 1.0, - GST_FORMAT_BYTES, src->segment.start, src->segment.stop, - src->segment.start); + bclass = GST_BASE_SRC_GET_CLASS (src); - return gst_pad_push_event (src->srcpad, event); + if (bclass->query) + result = bclass->query (src, query); + else + result = gst_pad_query_default (pad, query); + + gst_object_unref (src); + + return result; } static gboolean -gst_base_src_newsegment (GstBaseSrc * src) +gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment) +{ + gboolean res = TRUE; + + /* update our offset if the start/stop position was updated */ + if (segment->format == GST_FORMAT_BYTES) { + segment->last_stop = segment->start; + segment->time = segment->start; + } else if (segment->start == 0) { + /* seek to start, we can implement a default for this. */ + segment->last_stop = 0; + segment->time = 0; + res = TRUE; + } else + res = FALSE; + + return res; +} + +static gboolean +gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment) { GstBaseSrcClass *bclass; gboolean result = FALSE; bclass = GST_BASE_SRC_GET_CLASS (src); - if (bclass->newsegment) - result = bclass->newsegment (src); + if (bclass->do_seek) + result = bclass->do_seek (src, segment); return result; } -/* based on the event parameters configure the segment.start/stop - * times. Called with STREAM_LOCK. - */ -static gboolean -gst_base_src_configure_segment (GstBaseSrc * src, GstEvent * event) -{ - gdouble rate; - GstFormat format; - GstSeekFlags flags; - GstSeekType cur_type, stop_type; - gint64 cur, stop; - gboolean update; - - gst_event_parse_seek (event, &rate, &format, &flags, - &cur_type, &cur, &stop_type, &stop); - - gst_segment_set_seek (&src->segment, rate, format, flags, - cur_type, cur, stop_type, stop, &update); - - /* update our offset if it was updated */ - if (update) - src->offset = cur; - - GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT - " to %" G_GINT64_FORMAT, src->segment.start, src->segment.stop); - - return TRUE; -} - /* this code implements the seeking. It is a good example - * handling all cases (modulo the FIXMEs). + * handling all cases. * * A seek updates the currently configured segment.start * and segment.stop values based on the SEEK_TYPE. If the @@ -526,6 +630,12 @@ gst_base_src_configure_segment (GstBaseSrc * src, GstEvent * event) * unblocks the sample. In any case we acquire the STREAM_LOCK and * can continue the seek. A non-flushing seek is normally done in a * running pipeline to perform seamless playback. + * In the case of a non-flushing seek we need to make sure that the + * data we output after the seek is continuous with the previous data, + * this is because a non-flushing seek does not reset the stream-time + * to 0. We do this by closing the currently running segment, ie. sending + * a new_segment event with the stop position set to the last processed + * position. * * After updating the segment.start/stop values, we prepare for * streaming again. We push out a FLUSH_STOP to make the peer pad @@ -536,23 +646,48 @@ gst_base_src_configure_segment (GstBaseSrc * src, GstEvent * event) * when we reach the segment.stop we have to post a segment.done * instead of EOS when doing a segment seek. */ +/* FIXME, we have the unlock gboolean here because most current implementations + * (fdsrc, -base/gst/tcp/, ...) not only unlock when there is something to + * unlock + */ static gboolean -gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event) +gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) { + gboolean res; gdouble rate; GstFormat format; GstSeekFlags flags; + GstSeekType cur_type, stop_type; + gint64 cur, stop; gboolean flush; + gboolean update; + GstSegment seeksegment; - gst_event_parse_seek (event, &rate, &format, &flags, NULL, NULL, NULL, NULL); + GST_DEBUG_OBJECT (src, "doing seek"); - /* FIXME subclasses should be able to provide other formats */ - /* get seek format */ - if (format == GST_FORMAT_DEFAULT) - format = GST_FORMAT_BYTES; - /* we can only seek bytes */ - if (format != GST_FORMAT_BYTES) - goto unsupported_seek; + if (event) { + gst_event_parse_seek (event, &rate, &format, &flags, + &cur_type, &cur, &stop_type, &stop); + + /* we have to have a format as the segment format. Try to convert + * if not. */ + if (src->segment.format != format) { + GstFormat fmt; + + fmt = src->segment.format; + res = TRUE; + if (cur_type != GST_SEEK_TYPE_NONE) + res = gst_pad_query_convert (src->srcpad, format, cur, &fmt, &cur); + if (res && stop_type != GST_SEEK_TYPE_NONE) + res = gst_pad_query_convert (src->srcpad, format, stop, &fmt, &stop); + if (!res) + goto no_format; + + format = fmt; + } + } else { + flags = 0; + } flush = flags & GST_SEEK_FLAG_FLUSH; @@ -563,33 +698,73 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event) gst_pad_pause_task (src->srcpad); /* unblock streaming thread */ - gst_base_src_unlock (src); + if (unlock) + gst_base_src_unlock (src); /* grab streaming lock, this should eventually be possible, either * because the task is paused or out streaming thread stopped * because our peer is flushing. */ GST_PAD_STREAM_LOCK (src->srcpad); - /* now configure the segment */ - gst_base_src_configure_segment (src, event); + /* make copy into temp structure, we can only update the main one + * when the subclass actually could to the seek. */ + memcpy (&seeksegment, &src->segment, sizeof (GstSegment)); + + /* now configure the seek segment */ + if (event) { + gst_segment_set_seek (&seeksegment, rate, format, flags, + cur_type, cur, stop_type, stop, &update); + } + + GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT + " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT, + seeksegment.start, seeksegment.stop, seeksegment.last_stop); + + /* do the seek, segment.last_stop contains new position. */ + res = gst_base_src_do_seek (src, &seeksegment); /* and prepare to continue streaming */ - if (flush) + if (flush) { /* send flush stop, peer will accept data and events again. We * are not yet providing data as we still have the STREAM_LOCK. */ gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ()); + } else if (res && src->ABI.running) { + /* we are running the current segment and doing a non-flushing seek, + * close the segment first based on the last_stop. */ + GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT + " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop); - /* now make sure the newsegment will be send from the streaming - * thread. We could opt to send it here too. */ - src->need_newsegment = TRUE; - - if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { - /* FIXME subclasses should be able to provide other formats */ - gst_element_post_message (GST_ELEMENT (src), - gst_message_new_segment_start (GST_OBJECT (src), GST_FORMAT_BYTES, - src->segment.start)); + gst_pad_push_event (src->srcpad, + gst_event_new_new_segment (TRUE, + src->segment.rate, src->segment.format, + src->segment.start, src->segment.last_stop, src->segment.time)); } + /* if successfull seek, we update our real segment and push + * out the new segment. */ + if (res) { + memcpy (&src->segment, &seeksegment, sizeof (GstSegment)); + + if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { + gst_element_post_message (GST_ELEMENT (src), + gst_message_new_segment_start (GST_OBJECT (src), + src->segment.format, src->segment.last_stop)); + } + + if ((stop = src->segment.stop) == -1) + stop = src->segment.duration; + + /* now send the newsegment */ + GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT + " to %" G_GINT64_FORMAT, src->segment.start, stop); + + gst_pad_push_event (src->srcpad, + gst_event_new_new_segment (FALSE, + src->segment.rate, src->segment.format, + src->segment.last_stop, stop, src->segment.time)); + } + + src->ABI.running = TRUE; /* and restart the task in case it got paused explicitely or by * the FLUSH_START event we pushed out. */ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, @@ -598,13 +773,12 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event) /* and release the lock again so we can continue streaming */ GST_PAD_STREAM_UNLOCK (src->srcpad); - return TRUE; + return res; /* ERROR */ -unsupported_seek: +no_format: { - GST_DEBUG_OBJECT (src, "invalid format, seek aborted."); - + GST_DEBUG_OBJECT (src, "undefined format given, seek aborted."); return FALSE; } } @@ -621,38 +795,38 @@ gst_base_src_send_event (GstElement * element, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: - result = gst_base_src_configure_segment (src, event); + { + GST_OBJECT_LOCK (src); + /* gst_event_replace? */ + if (src->ABI.pending_seek) + gst_event_unref (src->ABI.pending_seek); + gst_event_ref (event); + src->ABI.pending_seek = event; + GST_OBJECT_UNLOCK (src); + result = TRUE; break; + } default: result = FALSE; break; } + gst_event_unref (event); return result; } static gboolean -gst_base_src_event_handler (GstPad * pad, GstEvent * event) +gst_base_src_default_event (GstBaseSrc * src, GstEvent * event) { - GstBaseSrc *src; - GstBaseSrcClass *bclass; gboolean result; - src = GST_BASE_SRC (gst_pad_get_parent (pad)); - bclass = GST_BASE_SRC_GET_CLASS (src); - - if (bclass->event) { - if (!(result = bclass->event (src, event))) - goto subclass_failed; - } - switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: /* is normally called when in push mode */ if (!src->seekable) goto not_seekable; - result = gst_base_src_do_seek (src, event); + result = gst_base_src_perform_seek (src, event, TRUE); break; case GST_EVENT_FLUSH_START: /* cancel any blocking getrange, is normally called @@ -664,6 +838,32 @@ gst_base_src_event_handler (GstPad * pad, GstEvent * event) result = TRUE; break; } + return result; + + /* ERRORS */ +not_seekable: + { + GST_DEBUG_OBJECT (src, "is not seekable"); + return FALSE; + } +} + +static gboolean +gst_base_src_event_handler (GstPad * pad, GstEvent * event) +{ + GstBaseSrc *src; + GstBaseSrcClass *bclass; + gboolean result = FALSE; + + src = GST_BASE_SRC (gst_pad_get_parent (pad)); + bclass = GST_BASE_SRC_GET_CLASS (src); + + if (bclass->event) { + if (!(result = bclass->event (src, event))) + goto subclass_failed; + } + +done: gst_event_unref (event); gst_object_unref (src); @@ -673,16 +873,7 @@ gst_base_src_event_handler (GstPad * pad, GstEvent * event) subclass_failed: { GST_DEBUG_OBJECT (src, "subclass refused event"); - gst_object_unref (src); - gst_event_unref (event); - return result; - } -not_seekable: - { - GST_DEBUG_OBJECT (src, "is not seekable"); - gst_object_unref (src); - gst_event_unref (event); - return FALSE; + goto done; } } @@ -701,6 +892,9 @@ gst_base_src_set_property (GObject * object, guint prop_id, case PROP_NUM_BUFFERS: src->num_buffers = g_value_get_int (value); break; + case PROP_TYPEFIND: + src->ABI.typefind = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -722,6 +916,9 @@ gst_base_src_get_property (GObject * object, guint prop_id, GValue * value, case PROP_NUM_BUFFERS: g_value_set_int (value, src->num_buffers); break; + case PROP_TYPEFIND: + g_value_set_boolean (value, src->ABI.typefind); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -736,6 +933,7 @@ gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time) GstClockID id; GstClock *clock; + /* get clock, if no clock, we don't sync */ if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL) return GST_CLOCK_OK; @@ -758,16 +956,15 @@ gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time) return ret; } - -/* perform synchronisation on a buffer +/* perform synchronisation on a buffer. + * with STREAM_LOCK. */ static GstClockReturn gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer) { - GstClockReturn result = GST_CLOCK_OK; + GstClockReturn result; GstClockTime start, end; GstBaseSrcClass *bclass; - gboolean start_valid; GstClockTime base_time; bclass = GST_BASE_SRC_GET_CLASS (basesrc); @@ -776,16 +973,9 @@ gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer) if (bclass->get_times) bclass->get_times (basesrc, buffer, &start, &end); - start_valid = GST_CLOCK_TIME_IS_VALID (start); - /* if we don't have a timestamp, we don't sync */ - if (!start_valid) { - GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start"); - goto done; - } - - GST_DEBUG_OBJECT (basesrc, "got times start: %" GST_TIME_FORMAT - ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); + if (!GST_CLOCK_TIME_IS_VALID (start)) + goto invalid_start; /* now do clocking */ GST_OBJECT_LOCK (basesrc); @@ -801,10 +991,73 @@ gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer) GST_LOG_OBJECT (basesrc, "clock entry done: %d", result); -done: return result; + + /* special cases */ +invalid_start: + { + GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start"); + return GST_CLOCK_OK; + } } +static gboolean +gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length) +{ + guint64 size, maxsize; + GstBaseSrcClass *bclass; + + bclass = GST_BASE_SRC_GET_CLASS (src); + + /* only operate if we are working with bytes */ + if (src->segment.format != GST_FORMAT_BYTES) + return TRUE; + + size = (guint64) src->segment.duration; + + /* the max amount of bytes to read is the total size or + * up to the segment.stop if present. */ + if (src->segment.stop != -1) + maxsize = MIN (size, src->segment.stop); + else + maxsize = size; + + GST_DEBUG_OBJECT (src, + "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT + ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset, + *length, size, src->segment.stop, maxsize); + + /* check size */ + if (maxsize != -1) { + if (offset > maxsize) + goto unexpected_length; + + if (offset + *length > maxsize) { + /* see if length of the file changed */ + if (bclass->get_size) + bclass->get_size (src, &size); + + if (src->segment.stop != -1) + maxsize = MIN (size, src->segment.stop); + else + maxsize = size; + + if (offset + *length > maxsize) { + *length = maxsize - offset; + } + } + } + if (*length == 0) + goto unexpected_length; + + return TRUE; + + /* ERRORS */ +unexpected_length: + { + return FALSE; + } +} static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, @@ -812,7 +1065,6 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, { GstFlowReturn ret; GstBaseSrcClass *bclass; - gint64 maxsize; GstClockReturn status; bclass = GST_BASE_SRC_GET_CLASS (src); @@ -840,39 +1092,7 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, if (G_UNLIKELY (!bclass->create)) goto no_function; - /* the max amount of bytes to read is the total size or - * up to the segment.stop if present. */ - if (src->segment.stop != -1) - maxsize = MIN (src->size, src->segment.stop); - else - maxsize = src->size; - - GST_DEBUG_OBJECT (src, - "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT - ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset, - length, src->size, src->segment.stop, maxsize); - - /* check size */ - if (maxsize != -1) { - if (offset > maxsize) - goto unexpected_length; - - if (offset + length > maxsize) { - /* see if length of the file changed */ - if (bclass->get_size) - bclass->get_size (src, &src->size); - - if (src->segment.stop != -1) - maxsize = MIN (src->size, src->segment.stop); - else - maxsize = src->size; - - if (offset + length > maxsize) { - length = maxsize - offset; - } - } - } - if (length == 0) + if (!gst_base_src_update_length (src, offset, &length)) goto unexpected_length; if (src->num_buffers_left == 0) { @@ -886,6 +1106,11 @@ gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, if (ret != GST_FLOW_OK) goto done; + /* no timestamp set and we are at offset 0 */ + if (offset == 0 && src->segment.time == 0 + && GST_BUFFER_TIMESTAMP (*buf) == -1) + GST_BUFFER_TIMESTAMP (*buf) = 0; + /* now sync before pushing the buffer */ status = gst_base_src_do_sync (src, *buf); switch (status) { @@ -926,7 +1151,7 @@ no_function: unexpected_length: { GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT - ", size=%" G_GUINT64_FORMAT ")", length, offset, src->size); + ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration); return GST_FLOW_UNEXPECTED; } reached_num_buffers: @@ -956,15 +1181,23 @@ static gboolean gst_base_src_check_get_range (GstPad * pad) { GstBaseSrc *src; + gboolean res; - src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); + src = GST_BASE_SRC (gst_pad_get_parent (pad)); if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) { gst_base_src_start (src); gst_base_src_stop (src); } - return src->seekable; + /* we can operate in getrange mode if the native format is bytes + * and we are seekable, this condition is set in the random_access + * flag and is set in the _start() method. */ + res = src->random_access; + + gst_object_unref (src); + + return res; } static void @@ -973,17 +1206,17 @@ gst_base_src_loop (GstPad * pad) GstBaseSrc *src; GstBuffer *buf = NULL; GstFlowReturn ret; + gint64 position; src = GST_BASE_SRC (gst_pad_get_parent (pad)); - /* only send segments when operating in push mode */ - if (G_UNLIKELY (src->need_newsegment)) { - /* now send newsegment */ - gst_base_src_newsegment (src); - src->need_newsegment = FALSE; - } + /* if we operate in bytes, we can calculate an offset */ + if (src->segment.format == GST_FORMAT_BYTES) + position = src->segment.last_stop; + else + position = -1; - ret = gst_base_src_get_range (src, src->offset, src->blocksize, &buf); + ret = gst_base_src_get_range (src, position, src->blocksize, &buf); if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (ret == GST_FLOW_UNEXPECTED) goto eos; @@ -993,12 +1226,42 @@ gst_base_src_loop (GstPad * pad) if (G_UNLIKELY (buf == NULL)) goto error; - src->offset += GST_BUFFER_SIZE (buf); + /* figure out the new position */ + switch (src->segment.format) { + case GST_FORMAT_BYTES: + position += GST_BUFFER_SIZE (buf); + break; + case GST_FORMAT_TIME: + { + GstClockTime start, duration; + + start = GST_BUFFER_TIMESTAMP (buf); + duration = GST_BUFFER_DURATION (buf); + + if (GST_CLOCK_TIME_IS_VALID (start)) + position = start; + else + position = src->segment.last_stop; + + if (GST_CLOCK_TIME_IS_VALID (duration)) + position += duration; + break; + } + case GST_FORMAT_DEFAULT: + position = GST_BUFFER_OFFSET_END (buf); + break; + default: + position = -1; + break; + } + if (position != -1) + gst_segment_set_last_stop (&src->segment, src->segment.format, position); ret = gst_pad_push (pad, buf); if (G_UNLIKELY (ret != GST_FLOW_OK)) goto pause; +done: gst_object_unref (src); return; @@ -1006,18 +1269,17 @@ gst_base_src_loop (GstPad * pad) eos: { GST_DEBUG_OBJECT (src, "going to EOS, getrange returned UNEXPECTED"); + /* we finished the segment */ + src->ABI.running = FALSE; gst_pad_pause_task (pad); if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { - /* FIXME, subclass might want to use another format */ gst_element_post_message (GST_ELEMENT (src), gst_message_new_segment_done (GST_OBJECT (src), - GST_FORMAT_BYTES, src->segment.stop)); + src->segment.format, src->segment.last_stop)); } else { gst_pad_push_event (pad, gst_event_new_eos ()); } - - gst_object_unref (src); - return; + goto done; } pause: { @@ -1032,24 +1294,23 @@ pause: ("streaming task paused, reason %s", reason)); gst_pad_push_event (pad, gst_event_new_eos ()); } - gst_object_unref (src); - return; + goto done; } error: { GST_ELEMENT_ERROR (src, STREAM, FAILED, (_("Internal data flow error.")), ("element returned NULL buffer")); + /* we finished the segment on error */ + src->ABI.running = FALSE; gst_pad_pause_task (pad); gst_pad_push_event (pad, gst_event_new_eos ()); - - gst_object_unref (src); - return; + goto done; } } /* this will always be called between start() and stop(). So you can rely on - resources allocated by start() and freed from stop(). This needs to be added - to the docs at some point. */ + * resources allocated by start() and freed from stop(). This needs to be added + * to the docs at some point. */ static gboolean gst_base_src_unlock (GstBaseSrc * basesrc) { @@ -1075,41 +1336,11 @@ gst_base_src_unlock (GstBaseSrc * basesrc) return result; } -static gboolean -gst_base_src_get_size (GstBaseSrc * basesrc, guint64 * size) -{ - GstBaseSrcClass *bclass; - gboolean result = FALSE; - - bclass = GST_BASE_SRC_GET_CLASS (basesrc); - if (bclass->get_size) - result = bclass->get_size (basesrc, size); - - if (result) - basesrc->size = *size; - - return result; -} - -static gboolean -gst_base_src_is_seekable (GstBaseSrc * basesrc) -{ - GstBaseSrcClass *bclass; - - bclass = GST_BASE_SRC_GET_CLASS (basesrc); - - /* check if we can seek */ - if (bclass->is_seekable) - basesrc->seekable = bclass->is_seekable (basesrc); - else - basesrc->seekable = FALSE; - - GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable); - - return basesrc->seekable; -} - -/* default negotiation code */ +/* default negotiation code. + * + * Take intersection between src and sink pads, take first + * caps and fixate. + */ static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc) { @@ -1137,7 +1368,7 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc) gst_caps_unref (thiscaps); gst_caps_unref (peercaps); if (icaps) { - /* take first (and best) possibility */ + /* take first (and best, since they are sorted) possibility */ caps = gst_caps_copy_nth (icaps, 0); gst_caps_unref (icaps); } @@ -1157,15 +1388,14 @@ gst_base_src_default_negotiate (GstBaseSrc * basesrc) if (gst_caps_is_any (caps)) { /* hmm, still anything, so element can do anything and * nego is not needed */ - gst_caps_unref (caps); result = TRUE; } else if (gst_caps_is_fixed (caps)) { /* yay, fixed caps, use those then */ gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps); - gst_caps_unref (caps); result = TRUE; } } + gst_caps_unref (caps); } return result; @@ -1197,6 +1427,7 @@ gst_base_src_start (GstBaseSrc * basesrc) { GstBaseSrcClass *bclass; gboolean result; + guint64 size; if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED)) return TRUE; @@ -1205,6 +1436,9 @@ gst_base_src_start (GstBaseSrc * basesrc) basesrc->num_buffers_left = basesrc->num_buffers; + gst_segment_init (&basesrc->segment, basesrc->segment.format); + basesrc->ABI.running = FALSE; + bclass = GST_BASE_SRC_GET_CLASS (basesrc); if (bclass->start) result = bclass->start (basesrc); @@ -1217,35 +1451,50 @@ gst_base_src_start (GstBaseSrc * basesrc) GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED); /* figure out the size */ - if (bclass->get_size) { - result = bclass->get_size (basesrc, &basesrc->size); - if (result == FALSE) - basesrc->size = -1; - } else { - result = FALSE; - basesrc->size = -1; + if (basesrc->segment.format == GST_FORMAT_BYTES) { + if (bclass->get_size) { + if (!(result = bclass->get_size (basesrc, &size))) + size = -1; + } else { + result = FALSE; + size = -1; + } + /* only update the size when operating in bytes, subclass is supposed + * to set duration in the start method for other formats */ + gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size); } - GST_DEBUG ("size %d %lld", result, basesrc->size); + GST_DEBUG_OBJECT (basesrc, + "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %" + G_GINT64_FORMAT, basesrc->segment.format, result, size, + basesrc->segment.duration); - /* check if we can seek, updates ->seekable */ - gst_base_src_is_seekable (basesrc); + /* check if we can seek */ + if (bclass->is_seekable) + basesrc->seekable = bclass->is_seekable (basesrc); + else + basesrc->seekable = FALSE; - basesrc->need_newsegment = TRUE; + GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable); - /* run typefind */ -#if 0 - if (basesrc->seekable) { + /* update for random access flag */ + basesrc->random_access = basesrc->seekable && + basesrc->segment.format == GST_FORMAT_BYTES; + + GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access); + + /* run typefind if we are random_access and the typefinding is enabled. */ + if (basesrc->random_access && basesrc->ABI.typefind && size != -1) { GstCaps *caps; - caps = gst_type_find_helper (basesrc->srcpad, basesrc->size); + caps = gst_type_find_helper (basesrc->srcpad, size); gst_pad_set_caps (basesrc->srcpad, caps); gst_caps_unref (caps); + } else { + /* use class or default negotiate function */ + if (!gst_base_src_negotiate (basesrc)) + goto could_not_negotiate; } -#endif - - if (!gst_base_src_negotiate (basesrc)) - goto could_not_negotiate; return TRUE; @@ -1259,8 +1508,7 @@ could_not_negotiate: { GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping"); GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, - ("Could not connect source to pipeline"), - ("Check your filtered caps, if any")); + ("Could not negotiate format"), ("Check your filtered caps, if any")); gst_base_src_stop (basesrc); return FALSE; } @@ -1316,6 +1564,8 @@ gst_base_src_activate_push (GstPad * pad, gboolean active) /* prepare subclass first */ if (active) { + GstEvent *event; + GST_DEBUG_OBJECT (basesrc, "Activating in push mode"); if (!basesrc->can_activate_push) @@ -1324,7 +1574,18 @@ gst_base_src_activate_push (GstPad * pad, gboolean active) if (!gst_base_src_start (basesrc)) goto error_start; - res = gst_pad_start_task (pad, (GstTaskFunction) gst_base_src_loop, pad); + /* do initial seek, which will start the task */ + GST_OBJECT_LOCK (basesrc); + event = basesrc->ABI.pending_seek; + basesrc->ABI.pending_seek = NULL; + GST_OBJECT_UNLOCK (basesrc); + + /* no need to unlock anything, the task is certainly + * not running here. */ + res = gst_base_src_perform_seek (basesrc, event, FALSE); + + if (event) + gst_event_unref (event); } else { GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode"); res = gst_base_src_deactivate (basesrc, pad); @@ -1358,11 +1619,11 @@ gst_base_src_activate_pull (GstPad * pad, gboolean active) if (!gst_base_src_start (basesrc)) goto error_start; - if (!basesrc->seekable) { + /* if not random_access, we cannot operate in pull mode for now */ + if (!basesrc->random_access) { gst_base_src_stop (basesrc); return FALSE; } - return TRUE; } else { GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode"); @@ -1424,13 +1685,6 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) goto failure; switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - /* we always run from start to end when in READY, after putting - * the element to READY a seek can be done on the element to - * configure the segment when going to PAUSED. */ - gst_segment_init (&basesrc->segment, GST_FORMAT_BYTES); - basesrc->offset = 0; - break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: GST_LIVE_LOCK (element); if (basesrc->is_live) { @@ -1442,9 +1696,6 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PAUSED_TO_READY: if (!gst_base_src_stop (basesrc)) goto error_stop; - /* we always run from start to end when in READY */ - gst_segment_init (&basesrc->segment, GST_FORMAT_BYTES); - basesrc->offset = 0; break; case GST_STATE_CHANGE_READY_TO_NULL: break; diff --git a/libs/gst/base/gstbasesrc.h b/libs/gst/base/gstbasesrc.h index 380f724d40..640ef0334c 100644 --- a/libs/gst/base/gstbasesrc.h +++ b/libs/gst/base/gstbasesrc.h @@ -93,18 +93,25 @@ struct _GstBaseSrc { GstSegment segment; gboolean need_newsegment; - guint64 offset; /* current offset in the resource */ - guint64 size; /* total size of the resource */ + guint64 offset; /* current offset in the resource, unused */ + guint64 size; /* total size of the resource, unused */ gint num_buffers; gint num_buffers_left; /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE]; + union { + struct { + gboolean typefind; + gboolean running; + GstEvent *pending_seek; + } ABI; + gpointer _gst_reserved[GST_PADDING_LARGE]; + }; }; /** - * _GstBaseSrcClass: + * GstBaseSrcClass: * @create: ask the subclass to create a buffer with offset and size * @start: start processing */ @@ -150,8 +157,14 @@ struct _GstBaseSrcClass { GstFlowReturn (*create) (GstBaseSrc *src, guint64 offset, guint size, GstBuffer **buf); + /* additions that change padding... */ + /* notify subclasses of a seek */ + gboolean (*do_seek) (GstBaseSrc *src, GstSegment *segment); + /* notify subclasses of a query */ + gboolean (*query) (GstBaseSrc *src, GstQuery *query); + /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE]; + gpointer _gst_reserved[GST_PADDING_LARGE - 2]; }; GType gst_base_src_get_type (void); @@ -159,6 +172,8 @@ GType gst_base_src_get_type (void); void gst_base_src_set_live (GstBaseSrc *src, gboolean live); gboolean gst_base_src_is_live (GstBaseSrc *src); +void gst_base_src_set_format (GstBaseSrc *src, GstFormat format); + G_END_DECLS #endif /* __GST_BASE_SRC_H__ */