typefind: Do typefinding from a separate thread and not from the state change function

This commit is contained in:
Sebastian Dröge 2012-01-27 16:18:00 +01:00
parent fe9e284463
commit bf0964b63a
2 changed files with 278 additions and 132 deletions

View file

@ -157,8 +157,10 @@ static GstFlowReturn gst_type_find_element_getrange (GstPad * srcpad,
static GstStateChangeReturn
gst_type_find_element_change_state (GstElement * element,
GstStateChange transition);
static gboolean gst_type_find_element_activate (GstPad * pad,
static gboolean gst_type_find_element_activate_sink (GstPad * pad,
GstObject * parent);
static gboolean gst_type_find_element_activate_sink_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static gboolean gst_type_find_element_activate_src_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static GstFlowReturn
@ -167,6 +169,8 @@ gst_type_find_element_chain_do_typefinding (GstTypeFindElement * typefind,
static void gst_type_find_element_send_cached_events (GstTypeFindElement *
typefind);
static void gst_type_find_element_loop (GstPad * pad);
static guint gst_type_find_element_signals[LAST_SIGNAL] = { 0 };
static void
@ -260,7 +264,9 @@ gst_type_find_element_init (GstTypeFindElement * typefind)
"sink");
gst_pad_set_activate_function (typefind->sink,
GST_DEBUG_FUNCPTR (gst_type_find_element_activate));
GST_DEBUG_FUNCPTR (gst_type_find_element_activate_sink));
gst_pad_set_activatemode_function (typefind->sink,
GST_DEBUG_FUNCPTR (gst_type_find_element_activate_sink_mode));
gst_pad_set_chain_function (typefind->sink,
GST_DEBUG_FUNCPTR (gst_type_find_element_chain));
gst_pad_set_event_function (typefind->sink,
@ -410,21 +416,85 @@ out:
return res;
}
#if 0
static const GstEventMask *
gst_type_find_element_src_event_mask (GstPad * pad)
static gboolean
gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event)
{
static const GstEventMask mask[] = {
{GST_EVENT_SEEK,
GST_SEEK_METHOD_SET | GST_SEEK_METHOD_CUR | GST_SEEK_METHOD_END |
GST_SEEK_FLAG_FLUSH},
/* add more if you want, event masks suck and need to die anyway */
{0,}
};
GstSeekFlags flags;
GstSeekType cur_type, stop_type;
GstFormat format;
gboolean flush;
gdouble rate;
gint64 cur, stop;
GstSegment seeksegment = { 0, };
return mask;
gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
&stop_type, &stop);
/* we can only seek on bytes */
if (format != GST_FORMAT_BYTES) {
GST_DEBUG_OBJECT (typefind, "Can only seek on BYTES");
return FALSE;
}
/* copy segment, we need this because we still need the old
* segment when we close the current segment. */
memcpy (&seeksegment, &typefind->segment, sizeof (GstSegment));
GST_DEBUG_OBJECT (typefind, "configuring seek");
gst_segment_do_seek (&seeksegment, rate, format, flags,
cur_type, cur, stop_type, stop, NULL);
flush = ! !(flags & GST_SEEK_FLAG_FLUSH);
GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment);
if (flush) {
GST_DEBUG_OBJECT (typefind, "Starting flush");
gst_pad_push_event (typefind->sink, gst_event_new_flush_start ());
gst_pad_push_event (typefind->src, gst_event_new_flush_start ());
} else {
GST_DEBUG_OBJECT (typefind, "Non-flushing seek, pausing task");
gst_pad_pause_task (typefind->sink);
}
/* now grab the stream lock so that streaming cannot continue, for
* non flushing seeks when the element is in PAUSED this could block
* forever. */
GST_DEBUG_OBJECT (typefind, "Waiting for streaming to stop");
GST_PAD_STREAM_LOCK (typefind->sink);
if (flush) {
GST_DEBUG_OBJECT (typefind, "Stopping flush");
gst_pad_push_event (typefind->sink, gst_event_new_flush_stop (TRUE));
gst_pad_push_event (typefind->src, gst_event_new_flush_stop (TRUE));
}
/* now update the real segment info */
GST_DEBUG_OBJECT (typefind, "Committing new seek segment");
memcpy (&typefind->segment, &seeksegment, sizeof (GstSegment));
typefind->offset = typefind->segment.start;
/* notify start of new segment */
if (typefind->segment.flags & GST_SEEK_FLAG_SEGMENT) {
GstMessage *msg;
msg = gst_message_new_segment_start (GST_OBJECT (typefind),
GST_FORMAT_BYTES, typefind->segment.start);
gst_element_post_message (GST_ELEMENT (typefind), msg);
}
typefind->need_segment = TRUE;
/* restart our task since it might have been stopped when we did the
* flush. */
gst_pad_start_task (typefind->sink,
(GstTaskFunction) gst_type_find_element_loop, typefind->sink);
/* streaming can continue now */
GST_PAD_STREAM_UNLOCK (typefind->sink);
return TRUE;
}
#endif
static gboolean
gst_type_find_element_src_event (GstPad * pad, GstObject * parent,
@ -437,7 +507,14 @@ gst_type_find_element_src_event (GstPad * pad, GstObject * parent,
gst_mini_object_unref (GST_MINI_OBJECT_CAST (event));
return FALSE;
}
/* Only handle seeks here if driving the pipeline */
if (typefind->segment.format != GST_FORMAT_UNDEFINED &&
GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
return gst_type_find_element_seek (typefind, event);
} else {
return gst_pad_push_event (typefind->sink, event);
}
}
static void
@ -899,14 +976,184 @@ gst_type_find_element_activate_src_mode (GstPad * pad, GstObject * parent,
return res;
}
static gboolean
gst_type_find_element_activate (GstPad * pad, GstObject * parent)
static void
gst_type_find_element_loop (GstPad * pad)
{
GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
GstTypeFindElement *typefind;
GstFlowReturn ret = GST_FLOW_OK;
typefind = GST_TYPE_FIND_ELEMENT (GST_PAD_PARENT (pad));
if (typefind->mode == MODE_TYPEFIND) {
GstPad *peer;
GstCaps *found_caps = NULL;
GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
GST_DEBUG_OBJECT (typefind, "find type in pull mode");
peer = gst_pad_get_peer (pad);
if (peer) {
gint64 size;
gchar *ext;
if (!gst_pad_query_duration (peer, GST_FORMAT_BYTES, &size)) {
GST_WARNING_OBJECT (typefind, "Could not query upstream length!");
gst_object_unref (peer);
ret = GST_FLOW_ERROR;
goto pause;
}
/* the size if 0, we cannot continue */
if (size == 0) {
/* keep message in sync with message in sink event handler */
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND,
(_("Stream contains no data.")), ("Can't typefind empty stream"));
gst_object_unref (peer);
ret = GST_FLOW_ERROR;
goto pause;
}
ext = gst_type_find_get_extension (typefind, pad);
found_caps =
gst_type_find_helper_get_range (GST_OBJECT_CAST (peer),
GST_OBJECT_PARENT (peer),
(GstTypeFindHelperGetRangeFunction) (GST_PAD_GETRANGEFUNC (peer)),
(guint64) size, ext, &probability);
g_free (ext);
GST_DEBUG ("Found caps %" GST_PTR_FORMAT, found_caps);
gst_object_unref (peer);
}
if (!found_caps || probability < typefind->min_probability) {
GST_DEBUG ("Trying to guess using extension");
found_caps =
gst_type_find_guess_by_extension (typefind, pad, &probability);
}
if (!found_caps || probability < typefind->min_probability) {
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
gst_caps_replace (&found_caps, NULL);
ret = GST_FLOW_ERROR;
goto pause;
}
GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
0, probability, found_caps);
typefind->mode = MODE_NORMAL;
} else if (typefind->mode == MODE_NORMAL) {
GstBuffer *outbuf;
if (typefind->need_segment) {
typefind->need_segment = FALSE;
gst_pad_push_event (typefind->src,
gst_event_new_segment (&typefind->segment));
}
/* Pull 4k blocks and send downstream */
ret = gst_pad_pull_range (typefind->sink, typefind->offset, 4096, &outbuf);
if (ret != GST_FLOW_OK)
goto pause;
typefind->offset += 4096;
ret = gst_pad_push (typefind->src, outbuf);
if (ret != GST_FLOW_OK)
goto pause;
} else {
/* Error out */
ret = GST_FLOW_ERROR;
goto pause;
}
return;
pause:
{
const gchar *reason = gst_flow_get_name (ret);
gboolean push_eos = FALSE;
GST_LOG_OBJECT (typefind, "pausing task, reason %s", reason);
gst_pad_pause_task (typefind->sink);
if (ret == GST_FLOW_EOS) {
/* perform EOS logic */
if (typefind->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gint64 stop;
/* for segment playback we need to post when (in stream time)
* we stopped, this is either stop (when set) or the duration. */
if ((stop = typefind->segment.stop) == -1)
stop = typefind->offset;
GST_LOG_OBJECT (typefind, "Sending segment done, at end of segment");
gst_element_post_message (GST_ELEMENT (typefind),
gst_message_new_segment_done (GST_OBJECT (typefind),
GST_FORMAT_BYTES, stop));
} else {
push_eos = TRUE;
}
} else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
/* for fatal errors we post an error message */
GST_ELEMENT_ERROR (typefind, STREAM, FAILED, (NULL),
("stream stopped, reason %s", reason));
push_eos = TRUE;
}
if (push_eos) {
/* send EOS, and prevent hanging if no streams yet */
GST_LOG_OBJECT (typefind, "Sending EOS, at end of stream");
gst_pad_push_event (typefind->src, gst_event_new_eos ());
}
return;
}
}
static gboolean
gst_type_find_element_activate_sink_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active)
{
GstTypeFindElement *typefind;
typefind = GST_TYPE_FIND_ELEMENT (parent);
switch (mode) {
case GST_PAD_MODE_PULL:
if (active) {
gst_segment_init (&typefind->segment, GST_FORMAT_BYTES);
typefind->need_segment = TRUE;
typefind->offset = 0;
gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop,
pad);
} else {
gst_pad_stop_task (pad);
}
return TRUE;
break;
case GST_PAD_MODE_PUSH:
if (active)
start_typefinding (typefind);
else
stop_typefinding (typefind);
return TRUE;
break;
default:
return FALSE;
}
}
static gboolean
gst_type_find_element_activate_sink (GstPad * pad, GstObject * parent)
{
GstTypeFindElement *typefind;
GstQuery *query;
gboolean pull_mode;
GstCaps *found_caps = NULL;
GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
typefind = GST_TYPE_FIND_ELEMENT (parent);
@ -916,23 +1163,15 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
found_caps = gst_caps_ref (typefind->force_caps);
probability = GST_TYPE_FIND_MAXIMUM;
GST_OBJECT_UNLOCK (typefind);
goto done;
GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
0, probability, found_caps);
typefind->mode = MODE_NORMAL;
goto typefind_push;
}
GST_OBJECT_UNLOCK (typefind);
/* 1. try to activate in pull mode. if not, switch to push and succeed.
2. try to pull type find.
3. deactivate pull mode.
4. src pad might have been activated push by the state change. deactivate.
5. if we didn't find any caps, try getting the uri extension by doing an uri
query.
6. if we didn't find any caps, fail.
7. emit have-type; maybe the app connected the source pad to something.
8. if the sink pad is activated, we are in pull mode. succeed.
otherwise activate both pads in push mode and succeed.
*/
/* 1 */
query = gst_query_new_scheduling ();
if (!gst_pad_peer_query (pad, query)) {
@ -949,108 +1188,10 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE))
goto typefind_push;
/* 2 */
GST_DEBUG_OBJECT (typefind, "find type in pull mode");
{
GstPad *peer;
peer = gst_pad_get_peer (pad);
if (peer) {
gint64 size;
gchar *ext;
if (!gst_pad_query_duration (peer, GST_FORMAT_BYTES, &size)) {
GST_WARNING_OBJECT (typefind, "Could not query upstream length!");
gst_object_unref (peer);
gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
return FALSE;
}
/* the size if 0, we cannot continue */
if (size == 0) {
/* keep message in sync with message in sink event handler */
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND,
(_("Stream contains no data.")), ("Can't typefind empty stream"));
gst_object_unref (peer);
gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
return FALSE;
}
ext = gst_type_find_get_extension (typefind, pad);
found_caps =
gst_type_find_helper_get_range (GST_OBJECT_CAST (peer),
GST_OBJECT_PARENT (peer),
(GstTypeFindHelperGetRangeFunction) (GST_PAD_GETRANGEFUNC (peer)),
(guint64) size, ext, &probability);
g_free (ext);
GST_DEBUG ("Found caps %" GST_PTR_FORMAT, found_caps);
gst_object_unref (peer);
}
}
/* the type find helpers might have triggered setcaps here (due to upstream)
* setting caps on buffers, which emits typefound signal and an element
* could have been linked and have its pads activated
*
* If we deactivate the pads in the following steps we might mess up
* downstream element. We should prevent that.
*/
if (typefind->mode == MODE_NORMAL) {
/* this means we already emitted typefound */
GST_DEBUG ("Already managed to typefind !");
goto really_done;
}
/* 3 */
GST_DEBUG ("Deactivate pull mode");
gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
#if 0
/* 4 */
GST_DEBUG ("Deactivate push mode mode");
gst_pad_activate_mode (typefind->src, GST_PAD_MODE_PUSH, FALSE);
#endif
/* 5 */
if (!found_caps || probability < typefind->min_probability) {
GST_DEBUG ("Trying to guess using extension");
found_caps = gst_type_find_guess_by_extension (typefind, pad, &probability);
}
/* 6 */
if (!found_caps || probability < typefind->min_probability) {
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
gst_caps_replace (&found_caps, NULL);
return FALSE;
}
done:
/* 7 */
GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
0, probability, found_caps);
typefind->mode = MODE_NORMAL;
really_done:
gst_caps_unref (found_caps);
/* 8 */
if (gst_pad_is_active (pad))
return TRUE;
else {
gboolean ret;
GST_DEBUG ("Activating in push mode");
ret = gst_pad_activate_mode (typefind->src, GST_PAD_MODE_PUSH, TRUE);
ret &= gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
return ret;
}
typefind_push:
{
start_typefinding (typefind);
return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
}
}

View file

@ -59,6 +59,11 @@ struct _GstTypeFindElement {
GList * cached_events;
GstCaps * force_caps;
/* Only used when driving the pipeline */
gboolean need_segment;
GstSegment segment;
guint64 offset;
};
struct _GstTypeFindElementClass {