gst/: Work on flushing.

Original commit message from CVS:
* gst/elements/gstfilesrc.c: (gst_filesrc_getrange),
(gst_filesrc_get), (gst_filesrc_activate):
* gst/gstclock.c: (gst_clock_init), (gst_clock_dispose):
* gst/gstclock.h:
* gst/gstevent.c: (gst_event_new_discontinuous_valist),
(gst_event_new_discontinuous), (gst_event_discont_get_value),
(gst_event_new_flush):
* gst/gstevent.h:
* gst/gstpad.c: (gst_pad_set_active), (gst_pad_set_blocked_async),
(gst_pad_set_acceptcaps_function),
(gst_pad_set_fixatecaps_function), (gst_pad_unlink),
(gst_pad_link_prepare_filtered), (gst_pad_link_filtered),
(gst_pad_relink_filtered), (gst_real_pad_get_caps_unlocked),
(gst_pad_peer_get_caps), (gst_pad_fixate_caps),
(gst_pad_accept_caps), (gst_pad_peer_accept_caps),
(gst_pad_set_caps), (gst_pad_configure_sink),
(gst_pad_configure_src), (gst_pad_realize), (gst_pad_alloc_buffer),
(gst_pad_push), (gst_pad_push_event), (gst_pad_send_event):
* gst/gstpad.h:
* gst/gsttask.c: (gst_task_pause):
* gst/gsttask.h:
* gst/schedulers/threadscheduler.c:
(gst_thread_scheduler_task_class_init),
(gst_thread_scheduler_task_init),
(gst_thread_scheduler_task_start),
(gst_thread_scheduler_task_stop),
(gst_thread_scheduler_task_pause), (gst_thread_scheduler_func):
Work on flushing.
Allow tasks to be paused.
Remove some old code in GstClock
This commit is contained in:
Wim Taymans 2005-01-04 12:06:57 +00:00
parent 0f74c8b5f1
commit 5951a9840d
12 changed files with 144 additions and 23 deletions

View file

@ -1,3 +1,37 @@
2005-01-04 Wim Taymans <wim@fluendo.com>
* gst/elements/gstfilesrc.c: (gst_filesrc_getrange),
(gst_filesrc_get), (gst_filesrc_activate):
* gst/gstclock.c: (gst_clock_init), (gst_clock_dispose):
* gst/gstclock.h:
* gst/gstevent.c: (gst_event_new_discontinuous_valist),
(gst_event_new_discontinuous), (gst_event_discont_get_value),
(gst_event_new_flush):
* gst/gstevent.h:
* gst/gstpad.c: (gst_pad_set_active), (gst_pad_set_blocked_async),
(gst_pad_set_acceptcaps_function),
(gst_pad_set_fixatecaps_function), (gst_pad_unlink),
(gst_pad_link_prepare_filtered), (gst_pad_link_filtered),
(gst_pad_relink_filtered), (gst_real_pad_get_caps_unlocked),
(gst_pad_peer_get_caps), (gst_pad_fixate_caps),
(gst_pad_accept_caps), (gst_pad_peer_accept_caps),
(gst_pad_set_caps), (gst_pad_configure_sink),
(gst_pad_configure_src), (gst_pad_realize), (gst_pad_alloc_buffer),
(gst_pad_push), (gst_pad_push_event), (gst_pad_send_event):
* gst/gstpad.h:
* gst/gsttask.c: (gst_task_pause):
* gst/gsttask.h:
* gst/schedulers/threadscheduler.c:
(gst_thread_scheduler_task_class_init),
(gst_thread_scheduler_task_init),
(gst_thread_scheduler_task_start),
(gst_thread_scheduler_task_stop),
(gst_thread_scheduler_task_pause), (gst_thread_scheduler_func):
Work on flushing.
Allow tasks to be paused.
Remove some old code in GstClock
2004-12-31 Wim Taymans <wim@fluendo.com>
* gst/gstevent.c: (gst_event_new_discontinuous_valist),

View file

@ -719,7 +719,7 @@ gst_filesrc_get (GstPad * pad, GstBuffer ** buffer)
if (src->need_flush) {
src->need_flush = FALSE;
GST_DEBUG_OBJECT (src, "sending flush");
gst_pad_push_event (pad, gst_event_new_flush ());
gst_pad_push_event (pad, gst_event_new_flush (TRUE));
}
/* check for seek */
if (src->need_discont) {

View file

@ -403,18 +403,12 @@ gst_clock_init (GstClock * clock)
clock->entries = NULL;
clock->flags = 0;
clock->stats = FALSE;
clock->active_mutex = g_mutex_new ();
clock->active_cond = g_cond_new ();
}
static void
gst_clock_dispose (GObject * object)
{
GstClock *clock = GST_CLOCK (object);
g_mutex_free (clock->active_mutex);
g_cond_free (clock->active_cond);
//GstClock *clock = GST_CLOCK (object);
G_OBJECT_CLASS (parent_class)->dispose (object);
}

View file

@ -114,7 +114,7 @@ struct _GstClockEntry {
typedef enum
{
GST_CLOCK_STOPPED = 0,
GST_CLOCK_UNSCHEDULED = 0,
GST_CLOCK_TIMEOUT = 1,
GST_CLOCK_EARLY = 2,
GST_CLOCK_ERROR = 3,
@ -144,8 +144,6 @@ struct _GstClock {
GstClockTime adjust;
GstClockTime last_time;
GList *entries;
GMutex *active_mutex;
GCond *active_cond;
/*< private >*/
guint64 resolution;

View file

@ -262,14 +262,14 @@ gst_event_new_discontinuous_valist (gdouble rate, GstFormat format1,
* Returns: A new discontinuous event.
*/
GstEvent *
gst_event_new_discontinuous (gboolean new_media, GstFormat format1, ...)
gst_event_new_discontinuous (gdouble rate, GstFormat format1, ...)
{
va_list var_args;
GstEvent *event;
va_start (var_args, format1);
event = gst_event_new_discontinuous_valist (new_media, format1, var_args);
event = gst_event_new_discontinuous_valist (rate, format1, var_args);
va_end (var_args);
@ -359,3 +359,22 @@ gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 stop)
return event;
}
/**
* gst_event_new_flush:
* @done: Indicates the end of the flush
*
* Allocate a new flush event.
*
* Returns: A new flush event.
*/
GstEvent *
gst_event_new_flush (gboolean done)
{
GstEvent *event;
event = gst_event_new (GST_EVENT_FLUSH);
GST_EVENT_FLUSH_DONE (event) = done;
return event;
}

View file

@ -146,6 +146,8 @@ typedef struct
#define GST_EVENT_DISCONT_OFFSET(event,i) (GST_EVENT(event)->event_data.discont.offsets[i])
#define GST_EVENT_DISCONT_OFFSET_LEN(event) (GST_EVENT(event)->event_data.discont.noffsets)
#define GST_EVENT_FLUSH_DONE(event) (GST_EVENT(event)->event_data.flush.done)
#define GST_EVENT_SIZE_FORMAT(event) (GST_EVENT(event)->event_data.size.format)
#define GST_EVENT_SIZE_VALUE(event) (GST_EVENT(event)->event_data.size.value)
@ -171,6 +173,9 @@ struct _GstEvent {
gint noffsets;
gdouble rate;
} discont;
struct {
gboolean done;
} flush;
struct {
GstFormat format;
gint64 value;
@ -211,7 +216,7 @@ GstEvent* gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 sto
GstEvent* gst_event_new_size (GstFormat format, gint64 value);
/* discontinous event */
GstEvent* gst_event_new_discontinuous (gboolean new_media,
GstEvent* gst_event_new_discontinuous (gdouble rate,
GstFormat format1, ...);
GstEvent* gst_event_new_discontinuous_valist (gdouble rate,
GstFormat format1,
@ -222,7 +227,7 @@ gboolean gst_event_discont_get_value (GstEvent *event, GstFormat format,
#define gst_event_new_filler() gst_event_new(GST_EVENT_FILLER)
/* flush events */
#define gst_event_new_flush() gst_event_new(GST_EVENT_FLUSH)
GstEvent* gst_event_new_flush (gboolean done);
G_END_DECLS

View file

@ -2760,6 +2760,9 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
goto not_active;
if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
goto flushing;
gst_object_ref (GST_OBJECT_CAST (peer));
GST_UNLOCK (pad);
@ -2803,6 +2806,12 @@ not_active:
"pushing, but it was inactive");
GST_UNLOCK_RETURN (pad, GST_FLOW_WRONG_STATE);
}
flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but pad was flushing");
GST_UNLOCK_RETURN (pad, GST_FLOW_UNEXPECTED);
}
not_negotiated:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
@ -3456,6 +3465,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
peerpad = GST_RPAD_PEER (pad);
if (peerpad == NULL)
goto not_linked;
gst_object_ref (GST_OBJECT_CAST (peerpad));
GST_UNLOCK (pad);
@ -3504,6 +3514,21 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
GST_CAT_DEBUG (GST_CAT_EVENT, "have event type %d on pad %s:%s",
GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (rpad));
if (GST_PAD_IS_SINK (pad)) {
if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH) {
GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
GST_LOCK (pad);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_CAT_DEBUG (GST_CAT_EVENT, "clear flush flag");
GST_FLAG_UNSET (pad, GST_PAD_FLUSHING);
} else {
GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
GST_FLAG_SET (pad, GST_PAD_FLUSHING);
}
GST_UNLOCK (pad);
}
}
if ((eventfunc = GST_RPAD_EVENTFUNC (rpad)) == NULL)
goto no_function;

View file

@ -102,11 +102,11 @@ typedef enum {
typedef enum {
GST_FLOW_OK = 0, /* data passing was ok */
GST_FLOW_RESEND = 1, /* resend buffer, possibly with new caps */
GST_FLOW_ERROR = -1, /* some error occured */
GST_FLOW_ERROR = -1, /* some (fatal) error occured */
GST_FLOW_NOT_CONNECTED = -2, /* pad is not connected */
GST_FLOW_NOT_NEGOTIATED = -3, /* pad is not negotiated */
GST_FLOW_WRONG_STATE = -4, /* pad is in wrong state */
GST_FLOW_UNEXPECTED = -5, /* did not expect anything */
GST_FLOW_UNEXPECTED = -5, /* did not expect anything, this is not fatal */
GST_FLOW_NOT_SUPPORTED = -6 /* function not supported */
} GstFlowReturn;

View file

@ -116,3 +116,19 @@ gst_task_stop (GstTask * task)
return result;
}
gboolean
gst_task_pause (GstTask * task)
{
GstTaskClass *tclass;
gboolean result = FALSE;
g_return_val_if_fail (GST_IS_TASK (task), result);
tclass = GST_TASK_GET_CLASS (task);
if (tclass->pause)
result = tclass->pause (task);
return result;
}

View file

@ -49,6 +49,7 @@ struct _GstTaskClass {
gboolean (*start) (GstTask *task);
gboolean (*stop) (GstTask *task);
gboolean (*pause) (GstTask *task);
gpointer _gst_reserved[GST_PADDING];
};
@ -57,6 +58,7 @@ GType gst_task_get_type (void);
gboolean gst_task_start (GstTask *task);
gboolean gst_task_stop (GstTask *task);
gboolean gst_task_pause (GstTask *task);
G_END_DECLS

View file

@ -79,7 +79,8 @@ typedef struct _GstThreadSchedulerTaskClass GstThreadSchedulerTaskClass;
typedef enum
{
STATE_STOPPED,
STATE_STARTED
STATE_STARTED,
STATE_PAUSED
} TaskState;
@ -89,6 +90,7 @@ struct _GstThreadSchedulerTask
TaskState state;
GMutex *lock;
GCond *cond;
GstTaskFunction func;
gpointer data;
@ -106,6 +108,7 @@ static void gst_thread_scheduler_task_init (GstThreadSchedulerTask * object);
static gboolean gst_thread_scheduler_task_start (GstTask * task);
static gboolean gst_thread_scheduler_task_stop (GstTask * task);
static gboolean gst_thread_scheduler_task_pause (GstTask * task);
GType
gst_thread_scheduler_task_get_type (void)
@ -139,6 +142,7 @@ gst_thread_scheduler_task_class_init (gpointer klass, gpointer class_data)
task->start = gst_thread_scheduler_task_start;
task->stop = gst_thread_scheduler_task_stop;
task->pause = gst_thread_scheduler_task_pause;
}
static void
@ -146,6 +150,7 @@ gst_thread_scheduler_task_init (GstThreadSchedulerTask * task)
{
task->state = STATE_STOPPED;
task->lock = g_mutex_new ();
task->cond = g_cond_new ();
}
static gboolean
@ -156,10 +161,12 @@ gst_thread_scheduler_task_start (GstTask * task)
GST_THREAD_SCHEDULER (gst_object_get_parent (GST_OBJECT (task)));
g_mutex_lock (ttask->lock);
if (ttask->state != STATE_STARTED) {
if (ttask->state == STATE_STOPPED) {
ttask->state = STATE_STARTED;
g_thread_pool_push (tsched->pool, task, NULL);
} else {
ttask->state = STATE_STARTED;
g_cond_signal (ttask->cond);
}
g_mutex_unlock (ttask->lock);
@ -174,11 +181,24 @@ gst_thread_scheduler_task_stop (GstTask * task)
g_mutex_lock (ttask->lock);
if (ttask->state != STATE_STOPPED) {
ttask->state = STATE_STOPPED;
g_cond_signal (ttask->cond);
}
g_mutex_unlock (ttask->lock);
return TRUE;
}
static gboolean
gst_thread_scheduler_task_pause (GstTask * task)
{
GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task);
g_mutex_lock (ttask->lock);
if (ttask->state != STATE_PAUSED) {
ttask->state = STATE_PAUSED;
}
g_mutex_unlock (ttask->lock);
return TRUE;
}
static void gst_thread_scheduler_class_init (gpointer g_class, gpointer data);
static void gst_thread_scheduler_init (GstThreadScheduler * object);
@ -242,12 +262,20 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * task,
GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task,
g_thread_self ());
g_mutex_lock (task->lock);
while (G_LIKELY (task->state == STATE_STARTED)) {
while (G_LIKELY (task->state != STATE_STOPPED)) {
if (task->state == STATE_PAUSED) {
g_cond_wait (task->cond, task->lock);
if (task->state == STATE_STOPPED)
break;
}
g_mutex_unlock (task->lock);
res = task->func (task->data);
g_mutex_lock (task->lock);
if (G_UNLIKELY (!res))
if (G_UNLIKELY (!res)) {
task->state = STATE_STOPPED;
}
}
g_mutex_unlock (task->lock);
GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ());

View file

@ -719,7 +719,7 @@ gst_filesrc_get (GstPad * pad, GstBuffer ** buffer)
if (src->need_flush) {
src->need_flush = FALSE;
GST_DEBUG_OBJECT (src, "sending flush");
gst_pad_push_event (pad, gst_event_new_flush ());
gst_pad_push_event (pad, gst_event_new_flush (TRUE));
}
/* check for seek */
if (src->need_discont) {