gst/: Implement gst_pad_pause/start/stop_task(), take STREAM lock in task function.

Original commit message from CVS:
* gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush):
* gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push),
(gst_basesink_finish_preroll), (gst_basesink_chain),
(gst_basesink_loop), (gst_basesink_activate),
(gst_basesink_change_state):
* gst/base/gstbasesrc.c: (gst_basesrc_do_seek),
(gst_basesrc_get_range), (gst_basesrc_loop),
(gst_basesrc_activate):
* gst/elements/gsttee.c: (gst_tee_sink_activate):
* gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init),
(gst_real_pad_init), (gst_real_pad_set_property),
(gst_real_pad_get_property), (gst_pad_set_active),
(gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink),
(gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent),
(gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps),
(gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize),
(gst_pad_event_default_dispatch), (gst_pad_event_default),
(gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose),
(gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain),
(gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range),
(gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task),
(gst_pad_stop_task):
* gst/gstpad.h:
* gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_loop), (gst_queue_src_activate):
* gst/gsttask.c: (gst_task_init), (gst_task_set_lock),
(gst_task_get_state):
* gst/gsttask.h:
* gst/schedulers/threadscheduler.c:
(gst_thread_scheduler_task_start), (gst_thread_scheduler_func):
Implement gst_pad_pause/start/stop_task(), take STREAM lock
in task function.
Remove ACTIVE pad flag, use FLUSHING everywhere
Added _pad_chain(), _pad_get_range() to call chain/getrange
functions.
Add locks around IS_FLUSHING when reading.
Take STREAM lock in chain(), get_range() functions so plugins
don't need to take it anymore.
This commit is contained in:
Wim Taymans 2005-05-25 11:50:11 +00:00
parent 32f9787436
commit add280cd10
16 changed files with 521 additions and 477 deletions

View file

@ -1,3 +1,46 @@
2005-05-25 Wim Taymans <wim@fluendo.com>
* gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush):
* gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push),
(gst_basesink_finish_preroll), (gst_basesink_chain),
(gst_basesink_loop), (gst_basesink_activate),
(gst_basesink_change_state):
* gst/base/gstbasesrc.c: (gst_basesrc_do_seek),
(gst_basesrc_get_range), (gst_basesrc_loop),
(gst_basesrc_activate):
* gst/elements/gsttee.c: (gst_tee_sink_activate):
* gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init),
(gst_real_pad_init), (gst_real_pad_set_property),
(gst_real_pad_get_property), (gst_pad_set_active),
(gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink),
(gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent),
(gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps),
(gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize),
(gst_pad_event_default_dispatch), (gst_pad_event_default),
(gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose),
(gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain),
(gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range),
(gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task),
(gst_pad_stop_task):
* gst/gstpad.h:
* gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_loop), (gst_queue_src_activate):
* gst/gsttask.c: (gst_task_init), (gst_task_set_lock),
(gst_task_get_state):
* gst/gsttask.h:
* gst/schedulers/threadscheduler.c:
(gst_thread_scheduler_task_start), (gst_thread_scheduler_func):
Implement gst_pad_pause/start/stop_task(), take STREAM lock
in task function.
Remove ACTIVE pad flag, use FLUSHING everywhere
Added _pad_chain(), _pad_get_range() to call chain/getrange
functions.
Add locks around IS_FLUSHING when reading.
Take STREAM lock in chain(), get_range() functions so plugins
don't need to take it anymore.
2005-05-25 Wim Taymans <wim@fluendo.com>
* tools/gst-launch.c: (event_loop):

View file

@ -160,7 +160,7 @@ gst_adapter_peek (GstAdapter * adapter, guint size)
if (adapter->assembled_size < size) {
adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE;
GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n",
GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u",
adapter->assembled_size);
adapter->assembled_data =
g_realloc (adapter->assembled_data, adapter->assembled_size);
@ -198,7 +198,7 @@ gst_adapter_flush (GstAdapter * adapter, guint flush)
g_return_if_fail (flush > 0);
g_return_if_fail (flush <= adapter->size);
GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush);
GST_LOG_OBJECT (adapter, "flushing %u bytes", flush);
adapter->size -= flush;
adapter->assembled_len = 0;
while (flush > 0) {

View file

@ -378,6 +378,8 @@ gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
if (basesink->preroll_queue->length == 0) {
GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (bclass->preroll)
bclass->preroll (basesink, buffer);
}
@ -448,7 +450,7 @@ PrerollReturn
gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
GstBuffer * buffer)
{
gboolean usable;
gboolean flushing;
DEBUG ("finish preroll %p <\n", basesink);
/* lock order is important */
@ -461,14 +463,14 @@ gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
gst_element_commit_state (GST_ELEMENT (basesink));
GST_STATE_UNLOCK (basesink);
GST_LOCK (pad);
usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad);
GST_UNLOCK (pad);
if (!usable)
goto unusable;
gst_basesink_preroll_queue_push (basesink, pad, buffer);
GST_LOCK (pad);
flushing = GST_RPAD_IS_FLUSHING (pad);
GST_UNLOCK (pad);
if (flushing)
goto flushing;
if (basesink->need_preroll)
goto still_queueing;
@ -490,7 +492,7 @@ no_preroll:
GST_STATE_UNLOCK (basesink);
return PREROLL_PLAYING;
}
unusable:
flushing:
{
GST_DEBUG ("pad is flushing");
GST_PREROLL_UNLOCK (pad);
@ -726,12 +728,8 @@ gst_basesink_chain (GstPad * pad, GstBuffer * buf)
g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
GST_ACTIVATE_PUSH);
GST_STREAM_LOCK (pad);
result = gst_basesink_chain_unlocked (pad, buf);
GST_STREAM_UNLOCK (pad);
return result;
}
@ -748,8 +746,6 @@ gst_basesink_loop (GstPad * pad)
g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
GST_STREAM_LOCK (pad);
result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
if (result != GST_FLOW_OK)
goto paused;
@ -759,12 +755,10 @@ gst_basesink_loop (GstPad * pad)
goto paused;
/* default */
GST_STREAM_UNLOCK (pad);
return;
paused:
gst_task_pause (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
gst_pad_pause_task (pad);
return;
}
@ -787,16 +781,8 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
/* if we have a scheduler we can start the task */
g_return_val_if_fail (basesink->has_loop, FALSE);
gst_pad_peer_set_active (pad, mode);
if (GST_ELEMENT_SCHEDULER (basesink)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink),
(GstTaskFunction) gst_basesink_loop, pad);
gst_task_start (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result =
gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock clock sync (if any) or any other blocking thing */
@ -816,16 +802,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
GST_PREROLL_UNLOCK (pad);
/* step 2, make sure streaming finishes */
GST_STREAM_LOCK (pad);
/* step 3, stop the task */
if (GST_RPAD_TASK (pad)) {
gst_task_stop (GST_RPAD_TASK (pad));
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_RPAD_TASK (pad) = NULL;
}
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
break;
}
basesink->pad_mode = mode;
@ -911,9 +888,6 @@ gst_basesink_change_state (GstElement * element)
basesink->have_preroll = FALSE;
GST_PREROLL_UNLOCK (basesink->sinkpad);
/* make sure the element is finished processing */
GST_STREAM_LOCK (basesink->sinkpad);
GST_STREAM_UNLOCK (basesink->sinkpad);
/* clear EOS state */
basesink->eos = FALSE;
break;

View file

@ -333,9 +333,8 @@ gst_basesrc_do_seek (GstBaseSrc * src, GstEvent * event)
}
/* and restart the task */
if (GST_RPAD_TASK (src->srcpad)) {
gst_task_start (GST_RPAD_TASK (src->srcpad));
}
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop,
src->srcpad);
GST_STREAM_UNLOCK (src->srcpad);
gst_event_unref (event);
@ -447,7 +446,7 @@ gst_basesrc_get_property (GObject * object, guint prop_id, GValue * value,
}
static GstFlowReturn
gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length,
gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** buf)
{
GstFlowReturn ret;
@ -499,21 +498,6 @@ unexpected_length:
}
}
static GstFlowReturn
gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** ret)
{
GstFlowReturn fret;
GST_STREAM_LOCK (pad);
fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret);
GST_STREAM_UNLOCK (pad);
return fret;
}
static gboolean
gst_basesrc_check_get_range (GstPad * pad)
{
@ -538,9 +522,7 @@ gst_basesrc_loop (GstPad * pad)
src = GST_BASESRC (GST_OBJECT_PARENT (pad));
GST_STREAM_LOCK (pad);
ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf);
ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf);
if (ret != GST_FLOW_OK)
goto eos;
@ -550,22 +532,19 @@ gst_basesrc_loop (GstPad * pad)
if (ret != GST_FLOW_OK)
goto pause;
GST_STREAM_UNLOCK (pad);
return;
eos:
{
GST_DEBUG_OBJECT (src, "going to EOS");
gst_task_pause (GST_RPAD_TASK (pad));
gst_pad_pause_task (pad);
gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS));
GST_STREAM_UNLOCK (pad);
return;
}
pause:
{
GST_DEBUG_OBJECT (src, "pausing task");
gst_task_pause (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
gst_pad_pause_task (pad);
return;
}
}
@ -733,17 +712,8 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
result = FALSE;
switch (mode) {
case GST_ACTIVATE_PUSH:
/* if we have a scheduler we can start the task */
if (GST_ELEMENT_SCHEDULER (basesrc)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc),
(GstTaskFunction) gst_basesrc_loop, pad);
gst_task_start (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result =
gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad);
break;
case GST_ACTIVATE_PULL:
result = TRUE;
@ -753,16 +723,7 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
gst_basesrc_unlock (basesrc);
/* step 2, make sure streaming finishes */
GST_STREAM_LOCK (pad);
/* step 3, stop the task */
if (GST_RPAD_TASK (pad)) {
gst_task_stop (GST_RPAD_TASK (pad));
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_RPAD_TASK (pad) = NULL;
}
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
break;
}
return result;

View file

@ -370,27 +370,10 @@ gst_tee_sink_activate (GstPad * pad, GstActivateMode mode)
break;
case GST_ACTIVATE_PULL:
g_return_val_if_fail (tee->has_sink_loop, FALSE);
if (GST_ELEMENT_SCHEDULER (tee)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee),
(GstTaskFunction) gst_tee_loop, pad);
gst_pad_start_task (pad);
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
break;
case GST_ACTIVATE_NONE:
GST_STREAM_LOCK (pad);
if (GST_RPAD_TASK (pad)) {
gst_pad_stop_task (pad);
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_RPAD_TASK (pad) = NULL;
}
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
break;
}
tee->sink_mode = mode;

View file

@ -126,7 +126,7 @@ gst_pad_init (GstPad * pad)
static void
gst_pad_dispose (GObject * object)
{
GstPad *pad = GST_PAD (object);
GstPad *pad = GST_PAD_CAST (object);
gst_pad_set_pad_template (pad, NULL);
/* FIXME, we have links to many other things like caps
@ -152,8 +152,7 @@ enum
{
REAL_ARG_0,
REAL_ARG_CAPS,
REAL_ARG_ACTIVE
/* FILL ME */
/* FILL ME */
};
static void gst_real_pad_class_init (GstRealPadClass * klass);
@ -219,9 +218,6 @@ gst_real_pad_class_init (GstRealPadClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRealPadClass, request_link), NULL,
NULL, gst_marshal_VOID__OBJECT, G_TYPE_NONE, 0);
g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_ACTIVE,
g_param_spec_boolean ("active", "Active", "Whether the pad is active.",
TRUE, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_CAPS,
g_param_spec_boxed ("caps", "Caps", "The capabilities of the pad",
GST_TYPE_CAPS, G_PARAM_READABLE));
@ -251,7 +247,7 @@ gst_real_pad_init (GstRealPad * pad)
pad->queryfunc = gst_pad_query_default;
pad->intlinkfunc = gst_pad_get_internal_links_default;
GST_FLAG_UNSET (pad, GST_PAD_ACTIVE);
GST_RPAD_UNSET_FLUSHING (pad);
pad->preroll_lock = g_mutex_new ();
pad->preroll_cond = g_cond_new ();
@ -271,10 +267,6 @@ gst_real_pad_set_property (GObject * object, guint prop_id,
g_return_if_fail (GST_IS_PAD (object));
switch (prop_id) {
case REAL_ARG_ACTIVE:
g_warning ("FIXME: not useful any more!!!");
gst_pad_set_active (GST_PAD (object), g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -288,9 +280,6 @@ gst_real_pad_get_property (GObject * object, guint prop_id,
g_return_if_fail (GST_IS_PAD (object));
switch (prop_id) {
case REAL_ARG_ACTIVE:
g_value_set_boolean (value, GST_FLAG_IS_SET (object, GST_PAD_ACTIVE));
break;
case REAL_ARG_CAPS:
g_value_set_boxed (value, GST_PAD_CAPS (GST_REAL_PAD (object)));
break;
@ -451,7 +440,7 @@ gboolean
gst_pad_set_active (GstPad * pad, GstActivateMode mode)
{
GstRealPad *realpad;
gboolean old;
GstActivateMode old;
GstPadActivateFunction activatefunc;
gboolean active;
@ -460,17 +449,17 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
active = GST_PAD_MODE_ACTIVATE (mode);
old = GST_PAD_IS_ACTIVE (realpad);
old = GST_RPAD_ACTIVATE_MODE (realpad);
/* if nothing changed, we can just exit */
if (G_UNLIKELY (old == active))
if (G_UNLIKELY (old == mode))
goto was_ok;
/* make sure data is disallowed when going inactive */
if (!active) {
GST_CAT_DEBUG (GST_CAT_PADS, "de-activating pad %s:%s",
GST_DEBUG_PAD_NAME (realpad));
GST_FLAG_UNSET (realpad, GST_PAD_ACTIVE);
GST_RPAD_SET_FLUSHING (realpad);
/* unlock blocked pads so element can resume and stop */
GST_PAD_BLOCK_SIGNAL (realpad);
}
@ -510,16 +499,24 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
GST_LOCK (realpad);
if (result == FALSE)
goto activate_error;
/* store the mode */
GST_RPAD_ACTIVATE_MODE (realpad) = mode;
}
/* when going to active allow data passing now */
if (active) {
GST_CAT_DEBUG (GST_CAT_PADS, "activating pad %s:%s in mode %d",
GST_DEBUG_PAD_NAME (realpad), mode);
GST_FLAG_SET (realpad, GST_PAD_ACTIVE);
}
GST_UNLOCK (realpad);
GST_RPAD_UNSET_FLUSHING (realpad);
GST_UNLOCK (realpad);
} else {
GST_UNLOCK (realpad);
/* and make streaming finish */
GST_STREAM_LOCK (realpad);
GST_STREAM_UNLOCK (realpad);
}
return TRUE;
was_ok:
@ -607,7 +604,7 @@ gst_pad_is_active (GstPad * pad)
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
result = GST_FLAG_IS_SET (realpad, GST_PAD_ACTIVE);
result = GST_PAD_MODE_ACTIVATE (GST_RPAD_ACTIVATE_MODE (realpad));
GST_UNLOCK (realpad);
return result;
@ -933,7 +930,7 @@ gst_pad_get_query_types (GstPad * pad)
if (G_UNLIKELY ((func = GST_RPAD_QUERYTYPEFUNC (rpad)) == NULL))
goto no_func;
return func (GST_PAD (rpad));
return func (GST_PAD_CAST (rpad));
no_func:
{
@ -1187,10 +1184,10 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
goto not_linked_together;
if (GST_RPAD_UNLINKFUNC (realsrc)) {
GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD (realsrc));
GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD_CAST (realsrc));
}
if (GST_RPAD_UNLINKFUNC (realsink)) {
GST_RPAD_UNLINKFUNC (realsink) (GST_PAD (realsink));
GST_RPAD_UNLINKFUNC (realsink) (GST_PAD_CAST (realsink));
}
/* first clear peers */
@ -1329,7 +1326,8 @@ gst_pad_link_prepare (GstPad * srcpad, GstPad * sinkpad,
if (G_UNLIKELY (GST_RPAD_PEER (realsink) != NULL))
goto sink_was_linked;
if ((GST_PAD (realsrc) != srcpad) || (GST_PAD (realsink) != sinkpad)) {
if ((GST_PAD_CAST (realsrc) != srcpad)
|| (GST_PAD_CAST (realsink) != sinkpad)) {
GST_CAT_INFO (GST_CAT_PADS, "*actually* linking %s:%s and %s:%s",
GST_DEBUG_PAD_NAME (realsrc), GST_DEBUG_PAD_NAME (realsink));
}
@ -1427,12 +1425,14 @@ gst_pad_link (GstPad * srcpad, GstPad * sinkpad)
if (GST_RPAD_LINKFUNC (realsrc)) {
/* this one will call the peer link function */
result =
GST_RPAD_LINKFUNC (realsrc) (GST_PAD (realsrc), GST_PAD (realsink));
GST_RPAD_LINKFUNC (realsrc) (GST_PAD_CAST (realsrc),
GST_PAD_CAST (realsink));
} else if (GST_RPAD_LINKFUNC (realsink)) {
/* if no source link function, we need to call the sink link
* function ourselves. */
result =
GST_RPAD_LINKFUNC (realsink) (GST_PAD (realsink), GST_PAD (realsrc));
GST_RPAD_LINKFUNC (realsink) (GST_PAD_CAST (realsink),
GST_PAD_CAST (realsrc));
} else {
result = GST_PAD_LINK_OK;
}
@ -1530,7 +1530,7 @@ gst_pad_get_real_parent (GstPad * pad)
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
element = GST_PAD_PARENT (realpad);
if (element)
gst_object_ref (GST_OBJECT (element));
gst_object_ref (GST_OBJECT_CAST (element));
GST_UNLOCK (realpad);
return element;
@ -1586,7 +1586,7 @@ gst_real_pad_get_caps_unlocked (GstRealPad * realpad)
GST_FLAG_SET (realpad, GST_PAD_IN_GETCAPS);
GST_UNLOCK (realpad);
result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD (realpad));
result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD_CAST (realpad));
GST_LOCK (realpad);
GST_FLAG_UNSET (realpad, GST_PAD_IN_GETCAPS);
@ -1725,12 +1725,12 @@ gst_pad_peer_get_caps (GstPad * pad)
if (G_UNLIKELY (GST_RPAD_IS_IN_GETCAPS (peerpad)))
goto was_dispatching;
gst_object_ref (GST_OBJECT (peerpad));
gst_object_ref (GST_OBJECT_CAST (peerpad));
GST_UNLOCK (realpad);
result = gst_pad_get_caps (GST_PAD_CAST (peerpad));
gst_object_unref (GST_OBJECT (peerpad));
gst_object_unref (GST_OBJECT_CAST (peerpad));
return result;
@ -2029,7 +2029,7 @@ gst_pad_get_peer (GstPad * pad)
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
result = GST_RPAD_PEER (realpad);
if (result)
gst_object_ref (GST_OBJECT (result));
gst_object_ref (GST_OBJECT_CAST (result));
GST_UNLOCK (realpad);
return GST_PAD_CAST (result);
@ -2072,11 +2072,11 @@ gst_pad_realize (GstPad * pad)
GST_LOCK (pad);
result = GST_PAD_REALIZE (pad);
if (result && pad != GST_PAD_CAST (result)) {
gst_object_ref (GST_OBJECT (result));
gst_object_ref (GST_OBJECT_CAST (result));
GST_UNLOCK (pad);
/* no other thread could dispose this since we
* hold at least one ref */
gst_object_unref (GST_OBJECT (pad));
gst_object_unref (GST_OBJECT_CAST (pad));
} else {
GST_UNLOCK (pad);
}
@ -2385,7 +2385,7 @@ gst_pad_event_default_dispatch (GstPad * pad, GstEvent * event)
orig = pads = gst_pad_get_internal_links (pad);
while (pads) {
GstPad *eventpad = GST_PAD (pads->data);
GstPad *eventpad = GST_PAD_CAST (pads->data);
pads = g_list_next (pads);
@ -2444,7 +2444,7 @@ gst_pad_event_default (GstPad * pad, GstEvent * event)
if (GST_RPAD_TASK (rpad)) {
GST_DEBUG_OBJECT (rpad, "pausing task because of eos");
gst_task_pause (GST_RPAD_TASK (rpad));
gst_pad_pause_task (GST_PAD_CAST (rpad));
}
}
default:
@ -2484,7 +2484,7 @@ gst_pad_dispatcher (GstPad * pad, GstPadDispatcherFunction dispatch,
GstRealPad *int_peer = GST_RPAD_PEER (int_rpad);
if (int_peer) {
res = dispatch (GST_PAD (int_peer), data);
res = dispatch (GST_PAD_CAST (int_peer), data);
if (res)
break;
}
@ -2561,7 +2561,7 @@ gst_real_pad_dispose (GObject * object)
GstPad *pad;
GstRealPad *rpad;
pad = GST_PAD (object);
pad = GST_PAD_CAST (object);
rpad = GST_REAL_PAD (object);
/* No linked pad can ever be disposed.
@ -2582,7 +2582,7 @@ gst_real_pad_dispose (GObject * object)
orig = ghostpads = g_list_copy (rpad->ghostpads);
while (ghostpads) {
GstPad *ghostpad = GST_PAD (ghostpads->data);
GstPad *ghostpad = GST_PAD_CAST (ghostpads->data);
if (GST_IS_ELEMENT (GST_OBJECT_PARENT (ghostpad))) {
GstElement *parent = GST_ELEMENT (GST_OBJECT_PARENT (ghostpad));
@ -2738,7 +2738,7 @@ gst_pad_save_thyself (GstObject * object, xmlNodePtr parent)
if (GST_RPAD_PEER (realpad) != NULL) {
gchar *content;
peer = GST_PAD (GST_RPAD_PEER (realpad));
peer = GST_PAD_CAST (GST_RPAD_PEER (realpad));
/* first check to see if the peer's parent's parent is the same */
/* we just save it off */
content = g_strdup_printf ("%s.%s",
@ -2793,7 +2793,7 @@ handle_pad_block (GstRealPad * pad)
"signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
/* need to grab extra ref for the callbacks */
gst_object_ref (GST_OBJECT (pad));
gst_object_ref (GST_OBJECT_CAST (pad));
callback = pad->block_callback;
if (callback) {
@ -2820,13 +2820,102 @@ handle_pad_block (GstRealPad * pad)
GST_PAD_BLOCK_SIGNAL (pad);
}
gst_object_unref (GST_OBJECT (pad));
gst_object_unref (GST_OBJECT_CAST (pad));
}
/**********************************************************************
* Data passing functions
*/
/**
* gst_pad_chain:
* @pad: a sink #GstPad.
* @buffer: the #GstBuffer to send.
*
* Chain a buffer to @pad. The pad has to be a GstRealPad.
*
* Returns: a #GstFlowReturn from the pad.
*
* MT safe.
*/
GstFlowReturn
gst_pad_chain (GstPad * pad, GstBuffer * buffer)
{
GstCaps *caps;
gboolean caps_changed;
GstPadChainFunction chainfunc;
GstFlowReturn ret;
g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK,
GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
GST_STREAM_LOCK (pad);
GST_LOCK (pad);
if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad)))
goto flushing;
caps = GST_BUFFER_CAPS (buffer);
caps_changed = caps && caps != GST_RPAD_CAPS (pad);
GST_UNLOCK (pad);
/* we got a new datatype on the pad, see if it can handle it */
if (G_UNLIKELY (caps_changed)) {
GST_DEBUG ("caps changed to %" GST_PTR_FORMAT, caps);
if (G_UNLIKELY (!gst_pad_configure_sink (pad, caps)))
goto not_negotiated;
}
/* NOTE: we read the chainfunc unlocked.
* we cannot hold the lock for the pad so we might send
* the data to the wrong function. This is not really a
* problem since functions are assigned at creation time
* and don't change that often... */
if (G_UNLIKELY ((chainfunc = GST_RPAD_CHAINFUNC (pad)) == NULL))
goto no_function;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"calling chainfunction &%s of pad %s:%s",
GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (pad));
ret = chainfunc (pad, buffer);
GST_STREAM_UNLOCK (pad);
return ret;
/* ERRORS */
flushing:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but pad was flushing");
GST_UNLOCK (pad);
GST_STREAM_UNLOCK (pad);
return GST_FLOW_UNEXPECTED;
}
not_negotiated:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing buffer but pad did not accept");
GST_STREAM_UNLOCK (pad);
return GST_FLOW_NOT_NEGOTIATED;
}
no_function:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but not chainhandler");
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
("push on pad %s:%s but it has no chainfunction",
GST_DEBUG_PAD_NAME (pad)));
GST_STREAM_UNLOCK (pad);
return GST_FLOW_ERROR;
}
}
/**
* gst_pad_push:
@ -2844,9 +2933,6 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
{
GstRealPad *peer;
GstFlowReturn ret;
GstPadChainFunction chainfunc;
GstCaps *caps;
gboolean caps_changed;
g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
@ -2854,7 +2940,6 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
GST_LOCK (pad);
while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad)))
handle_pad_block (GST_REAL_PAD_CAST (pad));
@ -2862,38 +2947,10 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
goto not_linked;
if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
goto not_active;
if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
goto flushing;
gst_object_ref (GST_OBJECT_CAST (peer));
GST_UNLOCK (pad);
/* FIXME, move capnego this into a base class? */
caps = GST_BUFFER_CAPS (buffer);
caps_changed = caps && caps != GST_RPAD_CAPS (peer);
/* we got a new datatype on the peer pad, see if it can handle it */
if (G_UNLIKELY (caps_changed)) {
GST_DEBUG ("caps changed to %" GST_PTR_FORMAT, caps);
if (G_UNLIKELY (!gst_pad_configure_sink (GST_PAD_CAST (peer), caps)))
goto not_negotiated;
}
/* NOTE: we read the peer chainfunc unlocked.
* we cannot hold the lock for the peer so we might send
* the data to the wrong function. This is not really a
* problem since functions are assigned at creation time
* and don't change that often... */
if (G_UNLIKELY ((chainfunc = peer->chainfunc) == NULL))
goto no_function;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"calling chainfunction &%s of peer pad %s:%s",
GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (peer));
ret = chainfunc (GST_PAD_CAST (peer), buffer);
ret = gst_pad_chain (GST_PAD_CAST (peer), buffer);
gst_object_unref (GST_OBJECT_CAST (peer));
@ -2908,40 +2965,6 @@ not_linked:
GST_UNLOCK (pad);
return GST_FLOW_NOT_CONNECTED;
}
not_active:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but it was inactive");
GST_UNLOCK (pad);
return GST_FLOW_WRONG_STATE;
}
flushing:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but pad was flushing");
GST_UNLOCK (pad);
return GST_FLOW_UNEXPECTED;
}
not_negotiated:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing buffer but peer did not accept");
return GST_FLOW_NOT_NEGOTIATED;
}
no_function:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but not chainhandler");
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
("push on pad %s:%s but the peer pad %s:%s has no chainfunction",
GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)));
gst_object_unref (GST_OBJECT (peer));
return GST_FLOW_ERROR;
}
}
/**
@ -3009,6 +3032,73 @@ not_connected:
}
}
/**
* gst_pad_get_range:
* @pad: a src #GstPad.
* @buffer: a pointer to hold the #GstBuffer.
* @offset: The start offset of the buffer
* @length: The length of the buffer
*
* Calls the getrange function of @pad.
*
* Returns: a #GstFlowReturn from the pad.
*
* MT safe.
*/
GstFlowReturn
gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
GstBuffer ** buffer)
{
GstFlowReturn ret;
GstPadGetRangeFunction getrangefunc;
g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
GST_STREAM_LOCK (pad);
GST_LOCK (pad);
if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad)))
goto flushing;
GST_UNLOCK (pad);
if (G_UNLIKELY ((getrangefunc = GST_RPAD_GETRANGEFUNC (pad)) == NULL))
goto no_function;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"calling getrangefunc %s of peer pad %s:%s, offset %"
G_GUINT64_FORMAT ", size %u",
GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (pad),
offset, size);
ret = getrangefunc (pad, offset, size, buffer);
GST_STREAM_UNLOCK (pad);
return ret;
/* ERRORS */
flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pulling range, but pad was flushing");
GST_UNLOCK (pad);
GST_STREAM_UNLOCK (pad);
return GST_FLOW_UNEXPECTED;
}
no_function:
{
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
("pullrange on pad %s:%s but it has no getrangefunction",
GST_DEBUG_PAD_NAME (pad)));
GST_STREAM_UNLOCK (pad);
return GST_FLOW_ERROR;
}
}
/**
* gst_pad_pull_range:
* @pad: a sink #GstPad.
@ -3028,7 +3118,6 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
{
GstRealPad *peer;
GstFlowReturn ret;
GstPadGetRangeFunction getrangefunc;
g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK,
@ -3043,26 +3132,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
goto not_connected;
if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
goto not_active;
if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
goto flushing;
gst_object_ref (GST_OBJECT_CAST (peer));
GST_UNLOCK (pad);
/* see note in above function */
if (G_UNLIKELY ((getrangefunc = peer->getrangefunc) == NULL))
goto no_function;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"calling getrangefunc %s of peer pad %s:%s, offset %"
G_GUINT64_FORMAT ", size %u",
GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (peer),
offset, size);
ret = getrangefunc (GST_PAD_CAST (peer), offset, size, buffer);
ret = gst_pad_get_range (GST_PAD_CAST (peer), offset, size, buffer);
gst_object_unref (GST_OBJECT_CAST (peer));
@ -3076,28 +3149,6 @@ not_connected:
GST_UNLOCK (pad);
return GST_FLOW_NOT_CONNECTED;
}
not_active:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pulling range, but it was inactive");
GST_UNLOCK (pad);
return GST_FLOW_WRONG_STATE;
}
flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pulling range, but pad was flushing");
GST_UNLOCK (pad);
return GST_FLOW_UNEXPECTED;
}
no_function:
{
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
("pullrange on pad %s:%s but the peer pad %s:%s has no getrangefunction",
GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)));
gst_object_unref (GST_OBJECT (peer));
return GST_FLOW_ERROR;
}
}
/**
@ -3105,7 +3156,9 @@ no_function:
* @pad: a #GstPad to push the event to.
* @event: the #GstEvent to send to the pad.
*
* Sends the event to the peer of the given pad.
* Sends the event to the peer of the given pad. This function is
* mainly used by elements to send events to their peer
* elements.
*
* Returns: TRUE if the event was handled.
*
@ -3157,47 +3210,73 @@ gboolean
gst_pad_send_event (GstPad * pad, GstEvent * event)
{
gboolean result = FALSE;
GstRealPad *rpad;
GstRealPad *realpad;
GstPadEventFunction eventfunc;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
g_return_val_if_fail (event != NULL, FALSE);
rpad = GST_PAD_REALIZE (pad);
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
if (GST_EVENT_SRC (event) == NULL)
GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT (rpad));
GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT_CAST (realpad));
GST_CAT_DEBUG (GST_CAT_EVENT, "have event type %d on pad %s:%s",
GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (rpad));
GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (realpad));
if (GST_PAD_IS_SINK (pad)) {
if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH) {
GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
GST_LOCK (pad);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_CAT_DEBUG (GST_CAT_EVENT, "clear flush flag");
GST_FLAG_UNSET (pad, GST_PAD_FLUSHING);
} else {
GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
GST_FLAG_SET (pad, GST_PAD_FLUSHING);
}
GST_UNLOCK (pad);
if (GST_PAD_IS_SINK (realpad)) {
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH:
GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
if (GST_EVENT_FLUSH_DONE (event)) {
GST_RPAD_UNSET_FLUSHING (realpad);
GST_CAT_DEBUG (GST_CAT_EVENT, "cleared flush flag");
} else {
/* can't even accept a flush begin event when flushing */
if (GST_RPAD_IS_FLUSHING (realpad))
goto flushing;
GST_RPAD_SET_FLUSHING (realpad);
GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
}
break;
default:
if (GST_RPAD_IS_FLUSHING (realpad))
goto flushing;
break;
}
}
if ((eventfunc = GST_RPAD_EVENTFUNC (rpad)) == NULL)
if ((eventfunc = GST_RPAD_EVENTFUNC (realpad)) == NULL)
goto no_function;
result = eventfunc (GST_PAD_CAST (rpad), event);
gst_object_ref (GST_OBJECT_CAST (realpad));
GST_UNLOCK (realpad);
result = eventfunc (GST_PAD_CAST (realpad), event);
gst_object_unref (GST_OBJECT_CAST (realpad));
return result;
/* ERROR handling */
lost_ghostpad:
{
GST_CAT_DEBUG (GST_CAT_EVENT, "lost ghostpad");
gst_event_unref (event);
return FALSE;
}
no_function:
{
g_warning ("pad %s:%s has no event handler, file a bug.",
GST_DEBUG_PAD_NAME (rpad));
GST_DEBUG_PAD_NAME (realpad));
GST_UNLOCK (realpad);
gst_event_unref (event);
return FALSE;
}
flushing:
{
GST_UNLOCK (realpad);
GST_CAT_DEBUG (GST_CAT_EVENT, "received event on flushing pad");
gst_event_unref (event);
return FALSE;
}
@ -3444,24 +3523,107 @@ gst_pad_get_element_private (GstPad * pad)
}
gboolean
gst_pad_start_task (GstPad * pad)
gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data)
{
g_warning ("implement gst_pad_start_task()");
return FALSE;
GstElement *parent;
GstScheduler *sched;
GstTask *task;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
g_return_val_if_fail (func != NULL, FALSE);
GST_LOCK (pad);
parent = GST_PAD_PARENT (pad);
if (parent == NULL || !GST_IS_ELEMENT (parent))
goto no_parent;
sched = GST_ELEMENT_SCHEDULER (parent);
if (sched == NULL)
goto no_sched;
task = GST_RPAD_TASK (pad);
if (task == NULL) {
task = gst_scheduler_create_task (sched, func, data);
gst_task_set_lock (task, GST_STREAM_GET_LOCK (pad));
GST_RPAD_TASK (pad) = task;
}
GST_UNLOCK (pad);
gst_task_start (task);
return TRUE;
/* ERRORS */
no_parent:
{
GST_UNLOCK (pad);
GST_DEBUG ("no parent");
return FALSE;
}
no_sched:
{
GST_UNLOCK (pad);
GST_DEBUG ("no scheduler");
return FALSE;
}
}
gboolean
gst_pad_pause_task (GstPad * pad)
{
g_warning ("implement gst_pad_pause_task()");
return FALSE;
GstTask *task;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
GST_LOCK (pad);
task = GST_RPAD_TASK (pad);
if (task == NULL)
goto no_task;
gst_task_pause (task);
GST_UNLOCK (pad);
GST_STREAM_LOCK (pad);
GST_STREAM_UNLOCK (pad);
return TRUE;
no_task:
{
GST_UNLOCK (pad);
return TRUE;
}
}
gboolean
gst_pad_stop_task (GstPad * pad)
{
g_warning ("implement gst_pad_stop_task()");
return FALSE;
GstTask *task;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
GST_LOCK (pad);
task = GST_RPAD_TASK (pad);
if (task == NULL)
goto no_task;
GST_RPAD_TASK (pad) = NULL;
GST_UNLOCK (pad);
gst_task_stop (task);
GST_STREAM_LOCK (pad);
GST_STREAM_UNLOCK (pad);
gst_object_unref (GST_OBJECT_CAST (task));
return TRUE;
no_task:
{
GST_UNLOCK (pad);
return TRUE;
}
}

View file

@ -174,8 +174,7 @@ typedef enum {
} GstPadDirection;
typedef enum {
GST_PAD_ACTIVE = GST_OBJECT_FLAG_LAST,
GST_PAD_BLOCKED,
GST_PAD_BLOCKED = GST_OBJECT_FLAG_LAST,
GST_PAD_FLUSHING,
GST_PAD_IN_GETCAPS,
GST_PAD_IN_SETCAPS,
@ -242,6 +241,8 @@ struct _GstRealPad {
GstPadGetRangeFunction getrangefunc;
GstPadEventFunction eventfunc;
GstActivateMode mode;
/* ghostpads */
GList *ghostpads;
guint32 ghostpads_cookie;
@ -304,6 +305,7 @@ struct _GstGhostPadClass {
#define GST_RPAD_CHECKGETRANGEFUNC(pad) (GST_REAL_PAD_CAST(pad)->checkgetrangefunc)
#define GST_RPAD_GETRANGEFUNC(pad) (GST_REAL_PAD_CAST(pad)->getrangefunc)
#define GST_RPAD_EVENTFUNC(pad) (GST_REAL_PAD_CAST(pad)->eventfunc)
#define GST_RPAD_ACTIVATE_MODE(pad) (GST_REAL_PAD_CAST(pad)->mode)
#define GST_RPAD_QUERYTYPEFUNC(pad) (GST_REAL_PAD_CAST(pad)->querytypefunc)
#define GST_RPAD_QUERYFUNC(pad) (GST_REAL_PAD_CAST(pad)->queryfunc)
#define GST_RPAD_INTLINKFUNC(pad) (GST_REAL_PAD_CAST(pad)->intlinkfunc)
@ -321,16 +323,18 @@ struct _GstGhostPadClass {
#define GST_RPAD_BUFFERALLOCFUNC(pad) (GST_REAL_PAD_CAST(pad)->bufferallocfunc)
#define GST_RPAD_IS_LINKED(pad) (GST_RPAD_PEER(pad) != NULL)
#define GST_RPAD_IS_ACTIVE(pad) (GST_FLAG_IS_SET (pad, GST_PAD_ACTIVE))
#define GST_RPAD_IS_BLOCKED(pad) (GST_FLAG_IS_SET (pad, GST_PAD_BLOCKED))
#define GST_RPAD_IS_FLUSHING(pad) (GST_FLAG_IS_SET (pad, GST_PAD_FLUSHING))
#define GST_RPAD_IS_IN_GETCAPS(pad) (GST_FLAG_IS_SET (pad, GST_PAD_IN_GETCAPS))
#define GST_RPAD_IS_IN_SETCAPS(pad) (GST_FLAG_IS_SET (pad, GST_PAD_IN_SETCAPS))
#define GST_RPAD_IS_USABLE(pad) (GST_RPAD_IS_LINKED (pad) && \
GST_RPAD_IS_ACTIVE(pad) && GST_RPAD_IS_ACTIVE(GST_RPAD_PEER (pad)))
!GST_RPAD_IS_FLUSHING(pad) && !GST_RPAD_IS_FLUSHING(GST_RPAD_PEER (pad)))
#define GST_RPAD_IS_SRC(pad) (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC)
#define GST_RPAD_IS_SINK(pad) (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK)
#define GST_RPAD_SET_FLUSHING(pad) (GST_FLAG_SET (pad, GST_PAD_FLUSHING))
#define GST_RPAD_UNSET_FLUSHING(pad) (GST_FLAG_UNSET (pad, GST_PAD_FLUSHING))
#define GST_STREAM_GET_LOCK(pad) (GST_PAD_REALIZE(pad)->stream_rec_lock)
#define GST_STREAM_LOCK(pad) (g_static_rec_mutex_lock(GST_STREAM_GET_LOCK(pad)))
#define GST_STREAM_TRYLOCK(pad) (g_static_rec_mutex_trylock(GST_STREAM_GET_LOCK(pad)))
@ -360,9 +364,10 @@ struct _GstGhostPadClass {
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE (pad))
#define GST_PAD_PEER(pad) GST_PAD_CAST(GST_RPAD_PEER(GST_PAD_REALIZE(pad)))
#define GST_PAD_TASK(pad) GST_RPAD_TASK(pad)
/* Some check functions (unused?) */
#define GST_PAD_IS_LINKED(pad) (GST_RPAD_IS_LINKED(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_ACTIVE(pad) (GST_RPAD_IS_ACTIVE(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_BLOCKED(pad) (GST_RPAD_IS_BLOCKED(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_FLUSHING(pad) (GST_RPAD_IS_FLUSHING(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_IN_GETCAPS(pad) (GST_RPAD_IS_IN_GETCAPS(GST_PAD_REALIZE(pad)))
@ -509,17 +514,23 @@ gboolean gst_pad_peer_accept_caps (GstPad * pad, GstCaps *caps);
GstCaps * gst_pad_get_allowed_caps (GstPad * srcpad);
GstCaps * gst_pad_get_negotiated_caps (GstPad * pad);
/* data passing functions */
/* data passing functions to peer */
GstFlowReturn gst_pad_push (GstPad *pad, GstBuffer *buffer);
gboolean gst_pad_check_pull_range (GstPad *pad);
GstFlowReturn gst_pad_pull_range (GstPad *pad, guint64 offset, guint size,
GstBuffer **buffer);
gboolean gst_pad_push_event (GstPad *pad, GstEvent *event);
gboolean gst_pad_send_event (GstPad *pad, GstEvent *event);
gboolean gst_pad_event_default (GstPad *pad, GstEvent *event);
/* data passing functions on pad */
GstFlowReturn gst_pad_chain (GstPad *pad, GstBuffer *buffer);
GstFlowReturn gst_pad_get_range (GstPad *pad, guint64 offset, guint size,
GstBuffer **buffer);
gboolean gst_pad_send_event (GstPad *pad, GstEvent *event);
/* pad tasks */
gboolean gst_pad_start_task (GstPad *pad);
gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func,
gpointer data);
gboolean gst_pad_pause_task (GstPad *pad);
gboolean gst_pad_stop_task (GstPad *pad);

View file

@ -483,9 +483,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_STREAM_LOCK (queue->srcpad);
gst_task_start (GST_RPAD_TASK (queue->srcpad));
GST_STREAM_UNLOCK (queue->srcpad);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
@ -498,10 +497,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
g_cond_signal (queue->item_add);
/* make sure it stops */
GST_STREAM_LOCK (queue->srcpad);
gst_task_pause (GST_RPAD_TASK (queue->srcpad));
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
GST_STREAM_UNLOCK (queue->srcpad);
}
goto done;
case GST_EVENT_EOS:
@ -555,8 +552,6 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
GST_STREAM_LOCK (pad);
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK;
@ -633,6 +628,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
* half of state change executes */
@ -663,13 +661,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
return GST_FLOW_OK;
out_unref:
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
gst_buffer_unref (buffer);
@ -678,8 +674,7 @@ out_unref:
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
GST_QUEUE_MUTEX_UNLOCK;
gst_task_pause (GST_RPAD_TASK (queue->srcpad));
GST_STREAM_UNLOCK (pad);
gst_pad_pause_task (queue->srcpad);
gst_buffer_unref (buffer);
@ -695,8 +690,6 @@ gst_queue_loop (GstPad * pad)
queue = GST_QUEUE (GST_PAD_PARENT (pad));
GST_STREAM_LOCK (pad);
/* have to lock for thread-safety */
GST_QUEUE_MUTEX_LOCK;
@ -770,14 +763,12 @@ restart:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
return;
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
gst_task_pause (GST_RPAD_TASK (pad));
gst_pad_pause_task (pad);
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
return;
}
@ -860,17 +851,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
if (mode == GST_ACTIVATE_PUSH) {
/* if we have a scheduler we can start the task */
if (GST_ELEMENT_SCHEDULER (queue)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
(GstTaskFunction) gst_queue_loop, pad);
gst_task_start (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
@ -879,13 +860,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
GST_STREAM_LOCK (pad);
/* step 3, stop the task */
gst_task_stop (GST_RPAD_TASK (pad));
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
}
return result;
}

View file

@ -72,6 +72,7 @@ gst_task_class_init (GstTaskClass * klass)
static void
gst_task_init (GstTask * task)
{
task->lock = NULL;
task->cond = g_cond_new ();
task->state = GST_TASK_STOPPED;
}
@ -107,6 +108,24 @@ gst_task_create (GstTaskFunction func, gpointer data)
return NULL;
}
/**
* gst_task_set_lock:
* @task: The #GstTask to use
* @mutex: The GMutex to use
*
* Set the mutex used by the task.
*
* MT safe.
*/
void
gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
{
GST_LOCK (task);
task->lock = mutex;
GST_UNLOCK (task);
}
/**
* gst_task_get_state:
* @task: The #GstTask to query

View file

@ -54,15 +54,22 @@ typedef enum {
#define GST_TASK_SIGNAL(task) g_cond_signal(GST_TASK_GET_COND (task))
#define GST_TASK_BROADCAST(task) g_cond_breadcast(GST_TASK_GET_COND (task))
#define GST_TASK_GET_LOCK(task) (GST_TASK_CAST(task)->lock)
#define GST_TASK_LOCK(task) g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
#define GST_TASK_UNLOCK(task) g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
struct _GstTask {
GstObject object;
/*< public >*/ /* with TASK_LOCK */
GstTaskState state;
GCond *cond;
/*< public >*/ /* with LOCK */
GstTaskState state;
GCond *cond;
GStaticRecMutex *lock;
GstTaskFunction func;
gpointer data;
gpointer data;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
@ -83,6 +90,7 @@ struct _GstTaskClass {
GType gst_task_get_type (void);
GstTask* gst_task_create (GstTaskFunction func, gpointer data);
void gst_task_set_lock (GstTask *task, GStaticRecMutex *mutex);
GstTaskState gst_task_get_state (GstTask *task);

View file

@ -142,9 +142,16 @@ gst_thread_scheduler_task_start (GstTask * task)
GstThreadScheduler *tsched =
GST_THREAD_SCHEDULER (GST_OBJECT_PARENT (GST_OBJECT (task)));
GstTaskState old;
GStaticRecMutex *lock;
GST_DEBUG_OBJECT (task, "Starting task %p", task);
if ((lock = GST_TASK_GET_LOCK (task)) == NULL) {
lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (lock);
GST_TASK_GET_LOCK (task) = lock;
}
GST_LOCK (ttask);
old = GST_TASK_CAST (ttask)->state;
GST_TASK_CAST (ttask)->state = GST_TASK_STARTED;
@ -269,11 +276,18 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * ttask,
GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task,
g_thread_self ());
/* locking order is TASK_LOCK, LOCK */
GST_TASK_LOCK (task);
GST_LOCK (task);
while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
GST_TASK_UNLOCK (task);
GST_TASK_SIGNAL (task);
GST_TASK_WAIT (task);
GST_UNLOCK (task);
/* locking order.. */
GST_TASK_LOCK (task);
GST_LOCK (task);
if (task->state == GST_TASK_STOPPED)
goto done;
}
@ -285,6 +299,7 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * ttask,
}
done:
GST_UNLOCK (task);
GST_TASK_UNLOCK (task);
GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ());

View file

@ -160,7 +160,7 @@ gst_adapter_peek (GstAdapter * adapter, guint size)
if (adapter->assembled_size < size) {
adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE;
GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n",
GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u",
adapter->assembled_size);
adapter->assembled_data =
g_realloc (adapter->assembled_data, adapter->assembled_size);
@ -198,7 +198,7 @@ gst_adapter_flush (GstAdapter * adapter, guint flush)
g_return_if_fail (flush > 0);
g_return_if_fail (flush <= adapter->size);
GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush);
GST_LOG_OBJECT (adapter, "flushing %u bytes", flush);
adapter->size -= flush;
adapter->assembled_len = 0;
while (flush > 0) {

View file

@ -378,6 +378,8 @@ gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
if (basesink->preroll_queue->length == 0) {
GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (bclass->preroll)
bclass->preroll (basesink, buffer);
}
@ -448,7 +450,7 @@ PrerollReturn
gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
GstBuffer * buffer)
{
gboolean usable;
gboolean flushing;
DEBUG ("finish preroll %p <\n", basesink);
/* lock order is important */
@ -461,14 +463,14 @@ gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
gst_element_commit_state (GST_ELEMENT (basesink));
GST_STATE_UNLOCK (basesink);
GST_LOCK (pad);
usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad);
GST_UNLOCK (pad);
if (!usable)
goto unusable;
gst_basesink_preroll_queue_push (basesink, pad, buffer);
GST_LOCK (pad);
flushing = GST_RPAD_IS_FLUSHING (pad);
GST_UNLOCK (pad);
if (flushing)
goto flushing;
if (basesink->need_preroll)
goto still_queueing;
@ -490,7 +492,7 @@ no_preroll:
GST_STATE_UNLOCK (basesink);
return PREROLL_PLAYING;
}
unusable:
flushing:
{
GST_DEBUG ("pad is flushing");
GST_PREROLL_UNLOCK (pad);
@ -726,12 +728,8 @@ gst_basesink_chain (GstPad * pad, GstBuffer * buf)
g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
GST_ACTIVATE_PUSH);
GST_STREAM_LOCK (pad);
result = gst_basesink_chain_unlocked (pad, buf);
GST_STREAM_UNLOCK (pad);
return result;
}
@ -748,8 +746,6 @@ gst_basesink_loop (GstPad * pad)
g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
GST_STREAM_LOCK (pad);
result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
if (result != GST_FLOW_OK)
goto paused;
@ -759,12 +755,10 @@ gst_basesink_loop (GstPad * pad)
goto paused;
/* default */
GST_STREAM_UNLOCK (pad);
return;
paused:
gst_task_pause (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
gst_pad_pause_task (pad);
return;
}
@ -787,16 +781,8 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
/* if we have a scheduler we can start the task */
g_return_val_if_fail (basesink->has_loop, FALSE);
gst_pad_peer_set_active (pad, mode);
if (GST_ELEMENT_SCHEDULER (basesink)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink),
(GstTaskFunction) gst_basesink_loop, pad);
gst_task_start (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result =
gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock clock sync (if any) or any other blocking thing */
@ -816,16 +802,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
GST_PREROLL_UNLOCK (pad);
/* step 2, make sure streaming finishes */
GST_STREAM_LOCK (pad);
/* step 3, stop the task */
if (GST_RPAD_TASK (pad)) {
gst_task_stop (GST_RPAD_TASK (pad));
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_RPAD_TASK (pad) = NULL;
}
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
break;
}
basesink->pad_mode = mode;
@ -911,9 +888,6 @@ gst_basesink_change_state (GstElement * element)
basesink->have_preroll = FALSE;
GST_PREROLL_UNLOCK (basesink->sinkpad);
/* make sure the element is finished processing */
GST_STREAM_LOCK (basesink->sinkpad);
GST_STREAM_UNLOCK (basesink->sinkpad);
/* clear EOS state */
basesink->eos = FALSE;
break;

View file

@ -333,9 +333,8 @@ gst_basesrc_do_seek (GstBaseSrc * src, GstEvent * event)
}
/* and restart the task */
if (GST_RPAD_TASK (src->srcpad)) {
gst_task_start (GST_RPAD_TASK (src->srcpad));
}
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop,
src->srcpad);
GST_STREAM_UNLOCK (src->srcpad);
gst_event_unref (event);
@ -447,7 +446,7 @@ gst_basesrc_get_property (GObject * object, guint prop_id, GValue * value,
}
static GstFlowReturn
gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length,
gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** buf)
{
GstFlowReturn ret;
@ -499,21 +498,6 @@ unexpected_length:
}
}
static GstFlowReturn
gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** ret)
{
GstFlowReturn fret;
GST_STREAM_LOCK (pad);
fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret);
GST_STREAM_UNLOCK (pad);
return fret;
}
static gboolean
gst_basesrc_check_get_range (GstPad * pad)
{
@ -538,9 +522,7 @@ gst_basesrc_loop (GstPad * pad)
src = GST_BASESRC (GST_OBJECT_PARENT (pad));
GST_STREAM_LOCK (pad);
ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf);
ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf);
if (ret != GST_FLOW_OK)
goto eos;
@ -550,22 +532,19 @@ gst_basesrc_loop (GstPad * pad)
if (ret != GST_FLOW_OK)
goto pause;
GST_STREAM_UNLOCK (pad);
return;
eos:
{
GST_DEBUG_OBJECT (src, "going to EOS");
gst_task_pause (GST_RPAD_TASK (pad));
gst_pad_pause_task (pad);
gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS));
GST_STREAM_UNLOCK (pad);
return;
}
pause:
{
GST_DEBUG_OBJECT (src, "pausing task");
gst_task_pause (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
gst_pad_pause_task (pad);
return;
}
}
@ -733,17 +712,8 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
result = FALSE;
switch (mode) {
case GST_ACTIVATE_PUSH:
/* if we have a scheduler we can start the task */
if (GST_ELEMENT_SCHEDULER (basesrc)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc),
(GstTaskFunction) gst_basesrc_loop, pad);
gst_task_start (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result =
gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad);
break;
case GST_ACTIVATE_PULL:
result = TRUE;
@ -753,16 +723,7 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
gst_basesrc_unlock (basesrc);
/* step 2, make sure streaming finishes */
GST_STREAM_LOCK (pad);
/* step 3, stop the task */
if (GST_RPAD_TASK (pad)) {
gst_task_stop (GST_RPAD_TASK (pad));
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_RPAD_TASK (pad) = NULL;
}
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
break;
}
return result;

View file

@ -483,9 +483,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_STREAM_LOCK (queue->srcpad);
gst_task_start (GST_RPAD_TASK (queue->srcpad));
GST_STREAM_UNLOCK (queue->srcpad);
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
@ -498,10 +497,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
g_cond_signal (queue->item_add);
/* make sure it stops */
GST_STREAM_LOCK (queue->srcpad);
gst_task_pause (GST_RPAD_TASK (queue->srcpad));
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
GST_STREAM_UNLOCK (queue->srcpad);
}
goto done;
case GST_EVENT_EOS:
@ -555,8 +552,6 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
GST_STREAM_LOCK (pad);
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK;
@ -633,6 +628,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
* half of state change executes */
@ -663,13 +661,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
return GST_FLOW_OK;
out_unref:
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
gst_buffer_unref (buffer);
@ -678,8 +674,7 @@ out_unref:
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
GST_QUEUE_MUTEX_UNLOCK;
gst_task_pause (GST_RPAD_TASK (queue->srcpad));
GST_STREAM_UNLOCK (pad);
gst_pad_pause_task (queue->srcpad);
gst_buffer_unref (buffer);
@ -695,8 +690,6 @@ gst_queue_loop (GstPad * pad)
queue = GST_QUEUE (GST_PAD_PARENT (pad));
GST_STREAM_LOCK (pad);
/* have to lock for thread-safety */
GST_QUEUE_MUTEX_LOCK;
@ -770,14 +763,12 @@ restart:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
return;
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
gst_task_pause (GST_RPAD_TASK (pad));
gst_pad_pause_task (pad);
GST_QUEUE_MUTEX_UNLOCK;
GST_STREAM_UNLOCK (pad);
return;
}
@ -860,17 +851,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
if (mode == GST_ACTIVATE_PUSH) {
/* if we have a scheduler we can start the task */
if (GST_ELEMENT_SCHEDULER (queue)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
(GstTaskFunction) gst_queue_loop, pad);
gst_task_start (GST_RPAD_TASK (pad));
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
@ -879,13 +860,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
GST_STREAM_LOCK (pad);
/* step 3, stop the task */
gst_task_stop (GST_RPAD_TASK (pad));
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
}
return result;
}

View file

@ -370,27 +370,10 @@ gst_tee_sink_activate (GstPad * pad, GstActivateMode mode)
break;
case GST_ACTIVATE_PULL:
g_return_val_if_fail (tee->has_sink_loop, FALSE);
if (GST_ELEMENT_SCHEDULER (tee)) {
GST_STREAM_LOCK (pad);
GST_RPAD_TASK (pad) =
gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee),
(GstTaskFunction) gst_tee_loop, pad);
gst_pad_start_task (pad);
GST_STREAM_UNLOCK (pad);
result = TRUE;
}
result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
break;
case GST_ACTIVATE_NONE:
GST_STREAM_LOCK (pad);
if (GST_RPAD_TASK (pad)) {
gst_pad_stop_task (pad);
gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
GST_RPAD_TASK (pad) = NULL;
}
GST_STREAM_UNLOCK (pad);
result = TRUE;
result = gst_pad_stop_task (pad);
break;
}
tee->sink_mode = mode;