mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-04 15:36:35 +00:00
gst/base/gstbasesrc.*: Add a gboolean to decide when to push out a discont.
Original commit message from CVS: * gst/base/gstbasesrc.c: (gst_base_src_init), (gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start): * gst/base/gstbasesrc.h: Add a gboolean to decide when to push out a discont. * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push), (gst_queue_set_property), (gst_queue_get_property): Some cleanups. * tests/threadstate/threadstate1.c: (main): Make a thread test compile and run... very silly..
This commit is contained in:
parent
2c15dc146e
commit
f73fb55d1e
8 changed files with 141 additions and 140 deletions
17
ChangeLog
17
ChangeLog
|
@ -1,3 +1,20 @@
|
||||||
|
2005-07-20 Wim Taymans <wim@fluendo.com>
|
||||||
|
|
||||||
|
* gst/base/gstbasesrc.c: (gst_base_src_init),
|
||||||
|
(gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start):
|
||||||
|
* gst/base/gstbasesrc.h:
|
||||||
|
Add a gboolean to decide when to push out a discont.
|
||||||
|
|
||||||
|
* gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
|
||||||
|
(gst_queue_loop), (gst_queue_handle_src_query),
|
||||||
|
(gst_queue_sink_activate_push), (gst_queue_src_activate_push),
|
||||||
|
(gst_queue_set_property), (gst_queue_get_property):
|
||||||
|
Some cleanups.
|
||||||
|
|
||||||
|
* tests/threadstate/threadstate1.c: (main):
|
||||||
|
Make a thread test compile and run... very silly..
|
||||||
|
|
||||||
|
|
||||||
2005-07-20 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
|
2005-07-20 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
|
||||||
|
|
||||||
* docs/manual/appendix-porting.xml:
|
* docs/manual/appendix-porting.xml:
|
||||||
|
|
|
@ -195,6 +195,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
|
||||||
|
|
||||||
basesrc->segment_start = -1;
|
basesrc->segment_start = -1;
|
||||||
basesrc->segment_end = -1;
|
basesrc->segment_end = -1;
|
||||||
|
basesrc->need_discont = TRUE;
|
||||||
basesrc->blocksize = DEFAULT_BLOCKSIZE;
|
basesrc->blocksize = DEFAULT_BLOCKSIZE;
|
||||||
basesrc->clock_id = NULL;
|
basesrc->clock_id = NULL;
|
||||||
|
|
||||||
|
@ -432,8 +433,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event)
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* now send discont */
|
/* now make sure the discont will be send */
|
||||||
gst_base_src_send_discont (src);
|
src->need_discont = TRUE;
|
||||||
|
|
||||||
/* and restart the task */
|
/* and restart the task */
|
||||||
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
||||||
|
@ -669,9 +670,10 @@ gst_base_src_loop (GstPad * pad)
|
||||||
|
|
||||||
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
|
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
|
||||||
|
|
||||||
if (src->offset == 0) {
|
if (src->need_discont) {
|
||||||
/* now send discont */
|
/* now send discont */
|
||||||
gst_base_src_send_discont (src);
|
gst_base_src_send_discont (src);
|
||||||
|
src->need_discont = FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf);
|
ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf);
|
||||||
|
@ -871,7 +873,6 @@ gst_base_src_start (GstBaseSrc * basesrc)
|
||||||
|
|
||||||
/* start in the beginning */
|
/* start in the beginning */
|
||||||
basesrc->offset = 0;
|
basesrc->offset = 0;
|
||||||
basesrc->segment_start = 0;
|
|
||||||
|
|
||||||
/* figure out the size */
|
/* figure out the size */
|
||||||
if (bclass->get_size) {
|
if (bclass->get_size) {
|
||||||
|
@ -886,7 +887,9 @@ gst_base_src_start (GstBaseSrc * basesrc)
|
||||||
GST_DEBUG ("size %d %lld", result, basesrc->size);
|
GST_DEBUG ("size %d %lld", result, basesrc->size);
|
||||||
|
|
||||||
/* we always run to the end */
|
/* we always run to the end */
|
||||||
|
basesrc->segment_start = 0;
|
||||||
basesrc->segment_end = basesrc->size;
|
basesrc->segment_end = basesrc->size;
|
||||||
|
basesrc->need_discont = TRUE;
|
||||||
|
|
||||||
/* check if we can seek, updates ->seekable */
|
/* check if we can seek, updates ->seekable */
|
||||||
gst_base_src_is_seekable (basesrc);
|
gst_base_src_is_seekable (basesrc);
|
||||||
|
|
|
@ -92,6 +92,7 @@ struct _GstBaseSrc {
|
||||||
gint64 segment_start; /* start and end positions for seeking */
|
gint64 segment_start; /* start and end positions for seeking */
|
||||||
gint64 segment_end;
|
gint64 segment_end;
|
||||||
gboolean segment_loop;
|
gboolean segment_loop;
|
||||||
|
gboolean need_discont;
|
||||||
|
|
||||||
guint64 offset; /* current offset in the resource */
|
guint64 offset; /* current offset in the resource */
|
||||||
guint64 size; /* total size of the resource */
|
guint64 size; /* total size of the resource */
|
||||||
|
|
114
gst/gstqueue.c
114
gst/gstqueue.c
|
@ -95,21 +95,27 @@ enum
|
||||||
/* FILL ME */
|
/* FILL ME */
|
||||||
};
|
};
|
||||||
|
|
||||||
#define GST_QUEUE_MUTEX_LOCK G_STMT_START { \
|
#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
|
GST_CAT_LOG_OBJECT (queue_dataflow, q, \
|
||||||
"locking qlock from thread %p", \
|
"locking qlock from thread %p", \
|
||||||
g_thread_self ()); \
|
g_thread_self ()); \
|
||||||
g_mutex_lock (queue->qlock); \
|
g_mutex_lock (q->qlock); \
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
|
GST_CAT_LOG_OBJECT (queue_dataflow, q, \
|
||||||
"locked qlock from thread %p", \
|
"locked qlock from thread %p", \
|
||||||
g_thread_self ()); \
|
g_thread_self ()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define GST_QUEUE_MUTEX_UNLOCK G_STMT_START { \
|
#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
|
GST_QUEUE_MUTEX_LOCK (q); \
|
||||||
|
if (q->srcresult != GST_FLOW_OK) \
|
||||||
|
goto label; \
|
||||||
|
} G_STMT_END
|
||||||
|
|
||||||
|
#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, q, \
|
||||||
"unlocking qlock from thread %p", \
|
"unlocking qlock from thread %p", \
|
||||||
g_thread_self ()); \
|
g_thread_self ()); \
|
||||||
g_mutex_unlock (queue->qlock); \
|
g_mutex_unlock (q->qlock); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
|
|
||||||
|
@ -468,25 +474,25 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
switch (GST_EVENT_TYPE (event)) {
|
switch (GST_EVENT_TYPE (event)) {
|
||||||
case GST_EVENT_FLUSH:
|
case GST_EVENT_FLUSH:
|
||||||
STATUS (queue, "received flush event");
|
STATUS (queue, "received flush event");
|
||||||
/* forward event */
|
/* forward event, re first as we're going to use it still */
|
||||||
gst_event_ref (event);
|
gst_event_ref (event);
|
||||||
gst_pad_push_event (queue->srcpad, event);
|
gst_pad_push_event (queue->srcpad, event);
|
||||||
if (GST_EVENT_FLUSH_DONE (event)) {
|
if (GST_EVENT_FLUSH_DONE (event)) {
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
gst_queue_locked_flush (queue);
|
gst_queue_locked_flush (queue);
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
||||||
queue->srcpad);
|
queue->srcpad);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
|
STATUS (queue, "after flush");
|
||||||
} else {
|
} else {
|
||||||
/* now unblock the chain function */
|
/* now unblock the chain function */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
/* unblock the loop function */
|
/* unblock the loop function */
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
STATUS (queue, "after flush");
|
|
||||||
|
|
||||||
/* make sure it pauses */
|
/* make sure it pauses */
|
||||||
gst_pad_pause_task (queue->srcpad);
|
gst_pad_pause_task (queue->srcpad);
|
||||||
|
@ -504,11 +510,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
g_queue_push_tail (queue->queue, event);
|
g_queue_push_tail (queue->queue, event);
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
|
||||||
done:
|
done:
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
@ -546,10 +552,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
||||||
|
|
||||||
/* we have to lock the queue since we span threads */
|
/* we have to lock the queue since we span threads */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
|
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
|
||||||
|
@ -558,12 +561,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
* the user defined as "full". Note that this only applies to buffers.
|
* the user defined as "full". Note that this only applies to buffers.
|
||||||
* We always handle events and they don't count in our statistics. */
|
* We always handle events and they don't count in our statistics. */
|
||||||
while (gst_queue_is_filled (queue)) {
|
while (gst_queue_is_filled (queue)) {
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
/* how are we going to make space for this buffer? */
|
/* how are we going to make space for this buffer? */
|
||||||
switch (queue->leaky) {
|
switch (queue->leaky) {
|
||||||
|
@ -637,12 +637,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
STATUS (queue, "post-full wait");
|
STATUS (queue, "post-full wait");
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -660,14 +657,14 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
return GST_FLOW_OK;
|
return GST_FLOW_OK;
|
||||||
|
|
||||||
/* special conditions */
|
/* special conditions */
|
||||||
out_unref:
|
out_unref:
|
||||||
{
|
{
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
|
@ -679,7 +676,7 @@ out_flushing:
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"exit because task paused, reason: %d", ret);
|
"exit because task paused, reason: %d", ret);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
|
@ -697,19 +694,13 @@ gst_queue_loop (GstPad * pad)
|
||||||
queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
||||||
|
|
||||||
/* have to lock for thread-safety */
|
/* have to lock for thread-safety */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
restart:
|
restart:
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
while (gst_queue_is_empty (queue)) {
|
while (gst_queue_is_empty (queue)) {
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
STATUS (queue, "pre-empty wait");
|
STATUS (queue, "pre-empty wait");
|
||||||
while (gst_queue_is_empty (queue)) {
|
while (gst_queue_is_empty (queue)) {
|
||||||
|
@ -730,12 +721,9 @@ restart:
|
||||||
}
|
}
|
||||||
|
|
||||||
STATUS (queue, "post-empty wait");
|
STATUS (queue, "post-empty wait");
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* There's something in the list now, whatever it is */
|
/* There's something in the list now, whatever it is */
|
||||||
|
@ -752,9 +740,9 @@ restart:
|
||||||
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
|
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
|
||||||
queue->cur_level.time -= GST_BUFFER_DURATION (data);
|
queue->cur_level.time -= GST_BUFFER_DURATION (data);
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
result = gst_pad_push (pad, GST_BUFFER (data));
|
result = gst_pad_push (pad, GST_BUFFER (data));
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
/* can opt to check for srcresult here but the push should
|
/* can opt to check for srcresult here but the push should
|
||||||
* return an error value that is more accurate */
|
* return an error value that is more accurate */
|
||||||
if (result != GST_FLOW_OK) {
|
if (result != GST_FLOW_OK) {
|
||||||
|
@ -775,9 +763,9 @@ restart:
|
||||||
gst_pad_pause_task (queue->srcpad);
|
gst_pad_pause_task (queue->srcpad);
|
||||||
restart = FALSE;
|
restart = FALSE;
|
||||||
}
|
}
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
gst_pad_push_event (queue->srcpad, GST_EVENT (data));
|
gst_pad_push_event (queue->srcpad, GST_EVENT (data));
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
if (restart == TRUE)
|
if (restart == TRUE)
|
||||||
goto restart;
|
goto restart;
|
||||||
}
|
}
|
||||||
|
@ -786,7 +774,7 @@ restart:
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -794,7 +782,7 @@ out_flushing:
|
||||||
{
|
{
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"exit because task paused, reason: %d", queue->srcresult);
|
"exit because task paused, reason: %d", queue->srcresult);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -877,11 +865,11 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
||||||
result = TRUE;
|
result = TRUE;
|
||||||
} else {
|
} else {
|
||||||
/* step 1, unblock chain and loop functions */
|
/* step 1, unblock chain and loop functions */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
gst_queue_locked_flush (queue);
|
gst_queue_locked_flush (queue);
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* step 2, make sure streaming finishes */
|
/* step 2, make sure streaming finishes */
|
||||||
result = gst_pad_stop_task (pad);
|
result = gst_pad_stop_task (pad);
|
||||||
|
@ -900,16 +888,16 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
||||||
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
if (active) {
|
if (active) {
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
/* step 1, unblock chain and loop functions */
|
/* step 1, unblock chain and loop functions */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* step 2, make sure streaming finishes */
|
/* step 2, make sure streaming finishes */
|
||||||
result = gst_pad_stop_task (pad);
|
result = gst_pad_stop_task (pad);
|
||||||
|
@ -965,7 +953,7 @@ gst_queue_set_property (GObject * object,
|
||||||
|
|
||||||
/* someone could change levels here, and since this
|
/* someone could change levels here, and since this
|
||||||
* affects the get/put funcs, we need to lock for safety. */
|
* affects the get/put funcs, we need to lock for safety. */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_MAX_SIZE_BYTES:
|
case ARG_MAX_SIZE_BYTES:
|
||||||
|
@ -1000,7 +988,7 @@ gst_queue_set_property (GObject * object,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -1009,7 +997,7 @@ gst_queue_get_property (GObject * object,
|
||||||
{
|
{
|
||||||
GstQueue *queue = GST_QUEUE (object);
|
GstQueue *queue = GST_QUEUE (object);
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_CUR_LEVEL_BYTES:
|
case ARG_CUR_LEVEL_BYTES:
|
||||||
|
@ -1053,5 +1041,5 @@ gst_queue_get_property (GObject * object,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,6 +195,7 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
|
||||||
|
|
||||||
basesrc->segment_start = -1;
|
basesrc->segment_start = -1;
|
||||||
basesrc->segment_end = -1;
|
basesrc->segment_end = -1;
|
||||||
|
basesrc->need_discont = TRUE;
|
||||||
basesrc->blocksize = DEFAULT_BLOCKSIZE;
|
basesrc->blocksize = DEFAULT_BLOCKSIZE;
|
||||||
basesrc->clock_id = NULL;
|
basesrc->clock_id = NULL;
|
||||||
|
|
||||||
|
@ -432,8 +433,8 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event)
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* now send discont */
|
/* now make sure the discont will be send */
|
||||||
gst_base_src_send_discont (src);
|
src->need_discont = TRUE;
|
||||||
|
|
||||||
/* and restart the task */
|
/* and restart the task */
|
||||||
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
|
||||||
|
@ -669,9 +670,10 @@ gst_base_src_loop (GstPad * pad)
|
||||||
|
|
||||||
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
|
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
|
||||||
|
|
||||||
if (src->offset == 0) {
|
if (src->need_discont) {
|
||||||
/* now send discont */
|
/* now send discont */
|
||||||
gst_base_src_send_discont (src);
|
gst_base_src_send_discont (src);
|
||||||
|
src->need_discont = FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf);
|
ret = gst_base_src_get_range (pad, src->offset, src->blocksize, &buf);
|
||||||
|
@ -871,7 +873,6 @@ gst_base_src_start (GstBaseSrc * basesrc)
|
||||||
|
|
||||||
/* start in the beginning */
|
/* start in the beginning */
|
||||||
basesrc->offset = 0;
|
basesrc->offset = 0;
|
||||||
basesrc->segment_start = 0;
|
|
||||||
|
|
||||||
/* figure out the size */
|
/* figure out the size */
|
||||||
if (bclass->get_size) {
|
if (bclass->get_size) {
|
||||||
|
@ -886,7 +887,9 @@ gst_base_src_start (GstBaseSrc * basesrc)
|
||||||
GST_DEBUG ("size %d %lld", result, basesrc->size);
|
GST_DEBUG ("size %d %lld", result, basesrc->size);
|
||||||
|
|
||||||
/* we always run to the end */
|
/* we always run to the end */
|
||||||
|
basesrc->segment_start = 0;
|
||||||
basesrc->segment_end = basesrc->size;
|
basesrc->segment_end = basesrc->size;
|
||||||
|
basesrc->need_discont = TRUE;
|
||||||
|
|
||||||
/* check if we can seek, updates ->seekable */
|
/* check if we can seek, updates ->seekable */
|
||||||
gst_base_src_is_seekable (basesrc);
|
gst_base_src_is_seekable (basesrc);
|
||||||
|
|
|
@ -92,6 +92,7 @@ struct _GstBaseSrc {
|
||||||
gint64 segment_start; /* start and end positions for seeking */
|
gint64 segment_start; /* start and end positions for seeking */
|
||||||
gint64 segment_end;
|
gint64 segment_end;
|
||||||
gboolean segment_loop;
|
gboolean segment_loop;
|
||||||
|
gboolean need_discont;
|
||||||
|
|
||||||
guint64 offset; /* current offset in the resource */
|
guint64 offset; /* current offset in the resource */
|
||||||
guint64 size; /* total size of the resource */
|
guint64 size; /* total size of the resource */
|
||||||
|
|
|
@ -95,21 +95,27 @@ enum
|
||||||
/* FILL ME */
|
/* FILL ME */
|
||||||
};
|
};
|
||||||
|
|
||||||
#define GST_QUEUE_MUTEX_LOCK G_STMT_START { \
|
#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
|
GST_CAT_LOG_OBJECT (queue_dataflow, q, \
|
||||||
"locking qlock from thread %p", \
|
"locking qlock from thread %p", \
|
||||||
g_thread_self ()); \
|
g_thread_self ()); \
|
||||||
g_mutex_lock (queue->qlock); \
|
g_mutex_lock (q->qlock); \
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
|
GST_CAT_LOG_OBJECT (queue_dataflow, q, \
|
||||||
"locked qlock from thread %p", \
|
"locked qlock from thread %p", \
|
||||||
g_thread_self ()); \
|
g_thread_self ()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define GST_QUEUE_MUTEX_UNLOCK G_STMT_START { \
|
#define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
|
GST_QUEUE_MUTEX_LOCK (q); \
|
||||||
|
if (q->srcresult != GST_FLOW_OK) \
|
||||||
|
goto label; \
|
||||||
|
} G_STMT_END
|
||||||
|
|
||||||
|
#define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
|
||||||
|
GST_CAT_LOG_OBJECT (queue_dataflow, q, \
|
||||||
"unlocking qlock from thread %p", \
|
"unlocking qlock from thread %p", \
|
||||||
g_thread_self ()); \
|
g_thread_self ()); \
|
||||||
g_mutex_unlock (queue->qlock); \
|
g_mutex_unlock (q->qlock); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
|
|
||||||
|
@ -468,25 +474,25 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
switch (GST_EVENT_TYPE (event)) {
|
switch (GST_EVENT_TYPE (event)) {
|
||||||
case GST_EVENT_FLUSH:
|
case GST_EVENT_FLUSH:
|
||||||
STATUS (queue, "received flush event");
|
STATUS (queue, "received flush event");
|
||||||
/* forward event */
|
/* forward event, re first as we're going to use it still */
|
||||||
gst_event_ref (event);
|
gst_event_ref (event);
|
||||||
gst_pad_push_event (queue->srcpad, event);
|
gst_pad_push_event (queue->srcpad, event);
|
||||||
if (GST_EVENT_FLUSH_DONE (event)) {
|
if (GST_EVENT_FLUSH_DONE (event)) {
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
gst_queue_locked_flush (queue);
|
gst_queue_locked_flush (queue);
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
|
||||||
queue->srcpad);
|
queue->srcpad);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
|
STATUS (queue, "after flush");
|
||||||
} else {
|
} else {
|
||||||
/* now unblock the chain function */
|
/* now unblock the chain function */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
/* unblock the loop function */
|
/* unblock the loop function */
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
STATUS (queue, "after flush");
|
|
||||||
|
|
||||||
/* make sure it pauses */
|
/* make sure it pauses */
|
||||||
gst_pad_pause_task (queue->srcpad);
|
gst_pad_pause_task (queue->srcpad);
|
||||||
|
@ -504,11 +510,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
g_queue_push_tail (queue->queue, event);
|
g_queue_push_tail (queue->queue, event);
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
|
||||||
done:
|
done:
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
@ -546,10 +552,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
|
||||||
|
|
||||||
/* we have to lock the queue since we span threads */
|
/* we have to lock the queue since we span threads */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
|
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
|
||||||
|
@ -558,12 +561,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
* the user defined as "full". Note that this only applies to buffers.
|
* the user defined as "full". Note that this only applies to buffers.
|
||||||
* We always handle events and they don't count in our statistics. */
|
* We always handle events and they don't count in our statistics. */
|
||||||
while (gst_queue_is_filled (queue)) {
|
while (gst_queue_is_filled (queue)) {
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
/* how are we going to make space for this buffer? */
|
/* how are we going to make space for this buffer? */
|
||||||
switch (queue->leaky) {
|
switch (queue->leaky) {
|
||||||
|
@ -637,12 +637,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
STATUS (queue, "post-full wait");
|
STATUS (queue, "post-full wait");
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -660,14 +657,14 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
return GST_FLOW_OK;
|
return GST_FLOW_OK;
|
||||||
|
|
||||||
/* special conditions */
|
/* special conditions */
|
||||||
out_unref:
|
out_unref:
|
||||||
{
|
{
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
|
@ -679,7 +676,7 @@ out_flushing:
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"exit because task paused, reason: %d", ret);
|
"exit because task paused, reason: %d", ret);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
|
@ -697,19 +694,13 @@ gst_queue_loop (GstPad * pad)
|
||||||
queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
queue = GST_QUEUE (GST_PAD_PARENT (pad));
|
||||||
|
|
||||||
/* have to lock for thread-safety */
|
/* have to lock for thread-safety */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
restart:
|
restart:
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
while (gst_queue_is_empty (queue)) {
|
while (gst_queue_is_empty (queue)) {
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
|
|
||||||
STATUS (queue, "pre-empty wait");
|
STATUS (queue, "pre-empty wait");
|
||||||
while (gst_queue_is_empty (queue)) {
|
while (gst_queue_is_empty (queue)) {
|
||||||
|
@ -730,12 +721,9 @@ restart:
|
||||||
}
|
}
|
||||||
|
|
||||||
STATUS (queue, "post-empty wait");
|
STATUS (queue, "post-empty wait");
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
|
|
||||||
if (queue->srcresult != GST_FLOW_OK)
|
|
||||||
goto out_flushing;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* There's something in the list now, whatever it is */
|
/* There's something in the list now, whatever it is */
|
||||||
|
@ -752,9 +740,9 @@ restart:
|
||||||
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
|
if (GST_BUFFER_DURATION (data) != GST_CLOCK_TIME_NONE)
|
||||||
queue->cur_level.time -= GST_BUFFER_DURATION (data);
|
queue->cur_level.time -= GST_BUFFER_DURATION (data);
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
result = gst_pad_push (pad, GST_BUFFER (data));
|
result = gst_pad_push (pad, GST_BUFFER (data));
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
/* can opt to check for srcresult here but the push should
|
/* can opt to check for srcresult here but the push should
|
||||||
* return an error value that is more accurate */
|
* return an error value that is more accurate */
|
||||||
if (result != GST_FLOW_OK) {
|
if (result != GST_FLOW_OK) {
|
||||||
|
@ -775,9 +763,9 @@ restart:
|
||||||
gst_pad_pause_task (queue->srcpad);
|
gst_pad_pause_task (queue->srcpad);
|
||||||
restart = FALSE;
|
restart = FALSE;
|
||||||
}
|
}
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
gst_pad_push_event (queue->srcpad, GST_EVENT (data));
|
gst_pad_push_event (queue->srcpad, GST_EVENT (data));
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
|
||||||
if (restart == TRUE)
|
if (restart == TRUE)
|
||||||
goto restart;
|
goto restart;
|
||||||
}
|
}
|
||||||
|
@ -786,7 +774,7 @@ restart:
|
||||||
|
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -794,7 +782,7 @@ out_flushing:
|
||||||
{
|
{
|
||||||
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
|
||||||
"exit because task paused, reason: %d", queue->srcresult);
|
"exit because task paused, reason: %d", queue->srcresult);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -877,11 +865,11 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
|
||||||
result = TRUE;
|
result = TRUE;
|
||||||
} else {
|
} else {
|
||||||
/* step 1, unblock chain and loop functions */
|
/* step 1, unblock chain and loop functions */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
gst_queue_locked_flush (queue);
|
gst_queue_locked_flush (queue);
|
||||||
g_cond_signal (queue->item_del);
|
g_cond_signal (queue->item_del);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* step 2, make sure streaming finishes */
|
/* step 2, make sure streaming finishes */
|
||||||
result = gst_pad_stop_task (pad);
|
result = gst_pad_stop_task (pad);
|
||||||
|
@ -900,16 +888,16 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
|
||||||
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
queue = GST_QUEUE (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
if (active) {
|
if (active) {
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_OK;
|
queue->srcresult = GST_FLOW_OK;
|
||||||
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
} else {
|
} else {
|
||||||
/* step 1, unblock chain and loop functions */
|
/* step 1, unblock chain and loop functions */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
queue->srcresult = GST_FLOW_WRONG_STATE;
|
queue->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
g_cond_signal (queue->item_add);
|
g_cond_signal (queue->item_add);
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
|
|
||||||
/* step 2, make sure streaming finishes */
|
/* step 2, make sure streaming finishes */
|
||||||
result = gst_pad_stop_task (pad);
|
result = gst_pad_stop_task (pad);
|
||||||
|
@ -965,7 +953,7 @@ gst_queue_set_property (GObject * object,
|
||||||
|
|
||||||
/* someone could change levels here, and since this
|
/* someone could change levels here, and since this
|
||||||
* affects the get/put funcs, we need to lock for safety. */
|
* affects the get/put funcs, we need to lock for safety. */
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_MAX_SIZE_BYTES:
|
case ARG_MAX_SIZE_BYTES:
|
||||||
|
@ -1000,7 +988,7 @@ gst_queue_set_property (GObject * object,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -1009,7 +997,7 @@ gst_queue_get_property (GObject * object,
|
||||||
{
|
{
|
||||||
GstQueue *queue = GST_QUEUE (object);
|
GstQueue *queue = GST_QUEUE (object);
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_LOCK;
|
GST_QUEUE_MUTEX_LOCK (queue);
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_CUR_LEVEL_BYTES:
|
case ARG_CUR_LEVEL_BYTES:
|
||||||
|
@ -1053,5 +1041,5 @@ gst_queue_get_property (GObject * object,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_QUEUE_MUTEX_UNLOCK;
|
GST_QUEUE_MUTEX_UNLOCK (queue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,13 +10,13 @@ int
|
||||||
main (int argc, char *argv[])
|
main (int argc, char *argv[])
|
||||||
{
|
{
|
||||||
GstElement *fakesrc, *fakesink;
|
GstElement *fakesrc, *fakesink;
|
||||||
GstElement *thread;
|
GstElement *pipeline;
|
||||||
gint x;
|
gint x;
|
||||||
|
|
||||||
gst_init (&argc, &argv);
|
gst_init (&argc, &argv);
|
||||||
|
|
||||||
thread = gst_thread_new ("thread");
|
pipeline = gst_pipeline_new ("pipeline");
|
||||||
g_assert (thread != NULL);
|
g_assert (pipeline != NULL);
|
||||||
|
|
||||||
fakesrc = gst_element_factory_make ("fakesrc", "fake_source");
|
fakesrc = gst_element_factory_make ("fakesrc", "fake_source");
|
||||||
g_assert (fakesrc != NULL);
|
g_assert (fakesrc != NULL);
|
||||||
|
@ -24,16 +24,16 @@ main (int argc, char *argv[])
|
||||||
fakesink = gst_element_factory_make ("fakesink", "fake_sink");
|
fakesink = gst_element_factory_make ("fakesink", "fake_sink");
|
||||||
g_assert (fakesink != NULL);
|
g_assert (fakesink != NULL);
|
||||||
|
|
||||||
gst_bin_add_many (GST_BIN (thread), fakesrc, fakesink, NULL);
|
gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL);
|
||||||
gst_element_link (fakesrc, fakesink);
|
gst_element_link (fakesrc, fakesink);
|
||||||
|
|
||||||
for (x = 0; x < 10; x++) {
|
for (x = 0; x < 10; x++) {
|
||||||
g_print ("playing %d\n", x);
|
g_print ("playing %d\n", x);
|
||||||
gst_element_set_state (GST_ELEMENT (thread), GST_STATE_PLAYING);
|
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
|
||||||
g_usleep (G_USEC_PER_SEC);
|
g_usleep (G_USEC_PER_SEC);
|
||||||
|
|
||||||
g_print ("pausing %d\n", x);
|
g_print ("pausing %d\n", x);
|
||||||
gst_element_set_state (GST_ELEMENT (thread), GST_STATE_PAUSED);
|
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED);
|
||||||
g_usleep (G_USEC_PER_SEC);
|
g_usleep (G_USEC_PER_SEC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue