From 5951a9840d186604643e19143b696d7231dbad4c Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 4 Jan 2005 12:06:57 +0000 Subject: [PATCH] gst/: Work on flushing. Original commit message from CVS: * gst/elements/gstfilesrc.c: (gst_filesrc_getrange), (gst_filesrc_get), (gst_filesrc_activate): * gst/gstclock.c: (gst_clock_init), (gst_clock_dispose): * gst/gstclock.h: * gst/gstevent.c: (gst_event_new_discontinuous_valist), (gst_event_new_discontinuous), (gst_event_discont_get_value), (gst_event_new_flush): * gst/gstevent.h: * gst/gstpad.c: (gst_pad_set_active), (gst_pad_set_blocked_async), (gst_pad_set_acceptcaps_function), (gst_pad_set_fixatecaps_function), (gst_pad_unlink), (gst_pad_link_prepare_filtered), (gst_pad_link_filtered), (gst_pad_relink_filtered), (gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps), (gst_pad_fixate_caps), (gst_pad_accept_caps), (gst_pad_peer_accept_caps), (gst_pad_set_caps), (gst_pad_configure_sink), (gst_pad_configure_src), (gst_pad_realize), (gst_pad_alloc_buffer), (gst_pad_push), (gst_pad_push_event), (gst_pad_send_event): * gst/gstpad.h: * gst/gsttask.c: (gst_task_pause): * gst/gsttask.h: * gst/schedulers/threadscheduler.c: (gst_thread_scheduler_task_class_init), (gst_thread_scheduler_task_init), (gst_thread_scheduler_task_start), (gst_thread_scheduler_task_stop), (gst_thread_scheduler_task_pause), (gst_thread_scheduler_func): Work on flushing. Allow tasks to be paused. Remove some old code in GstClock --- ChangeLog | 34 ++++++++++++++++++++++++++++ gst/elements/gstfilesrc.c | 2 +- gst/gstclock.c | 8 +------ gst/gstclock.h | 4 +--- gst/gstevent.c | 23 +++++++++++++++++-- gst/gstevent.h | 9 ++++++-- gst/gstpad.c | 25 +++++++++++++++++++++ gst/gstpad.h | 4 ++-- gst/gsttask.c | 16 ++++++++++++++ gst/gsttask.h | 2 ++ gst/schedulers/threadscheduler.c | 38 +++++++++++++++++++++++++++----- plugins/elements/gstfilesrc.c | 2 +- 12 files changed, 144 insertions(+), 23 deletions(-) diff --git a/ChangeLog b/ChangeLog index d533960581..c02820d3c1 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,37 @@ +2005-01-04 Wim Taymans + + * gst/elements/gstfilesrc.c: (gst_filesrc_getrange), + (gst_filesrc_get), (gst_filesrc_activate): + * gst/gstclock.c: (gst_clock_init), (gst_clock_dispose): + * gst/gstclock.h: + * gst/gstevent.c: (gst_event_new_discontinuous_valist), + (gst_event_new_discontinuous), (gst_event_discont_get_value), + (gst_event_new_flush): + * gst/gstevent.h: + * gst/gstpad.c: (gst_pad_set_active), (gst_pad_set_blocked_async), + (gst_pad_set_acceptcaps_function), + (gst_pad_set_fixatecaps_function), (gst_pad_unlink), + (gst_pad_link_prepare_filtered), (gst_pad_link_filtered), + (gst_pad_relink_filtered), (gst_real_pad_get_caps_unlocked), + (gst_pad_peer_get_caps), (gst_pad_fixate_caps), + (gst_pad_accept_caps), (gst_pad_peer_accept_caps), + (gst_pad_set_caps), (gst_pad_configure_sink), + (gst_pad_configure_src), (gst_pad_realize), (gst_pad_alloc_buffer), + (gst_pad_push), (gst_pad_push_event), (gst_pad_send_event): + * gst/gstpad.h: + * gst/gsttask.c: (gst_task_pause): + * gst/gsttask.h: + * gst/schedulers/threadscheduler.c: + (gst_thread_scheduler_task_class_init), + (gst_thread_scheduler_task_init), + (gst_thread_scheduler_task_start), + (gst_thread_scheduler_task_stop), + (gst_thread_scheduler_task_pause), (gst_thread_scheduler_func): + Work on flushing. + Allow tasks to be paused. + Remove some old code in GstClock + + 2004-12-31 Wim Taymans * gst/gstevent.c: (gst_event_new_discontinuous_valist), diff --git a/gst/elements/gstfilesrc.c b/gst/elements/gstfilesrc.c index c8e375baa1..e4b44a62e1 100644 --- a/gst/elements/gstfilesrc.c +++ b/gst/elements/gstfilesrc.c @@ -719,7 +719,7 @@ gst_filesrc_get (GstPad * pad, GstBuffer ** buffer) if (src->need_flush) { src->need_flush = FALSE; GST_DEBUG_OBJECT (src, "sending flush"); - gst_pad_push_event (pad, gst_event_new_flush ()); + gst_pad_push_event (pad, gst_event_new_flush (TRUE)); } /* check for seek */ if (src->need_discont) { diff --git a/gst/gstclock.c b/gst/gstclock.c index bb422d01f5..2af49587b5 100644 --- a/gst/gstclock.c +++ b/gst/gstclock.c @@ -403,18 +403,12 @@ gst_clock_init (GstClock * clock) clock->entries = NULL; clock->flags = 0; clock->stats = FALSE; - - clock->active_mutex = g_mutex_new (); - clock->active_cond = g_cond_new (); } static void gst_clock_dispose (GObject * object) { - GstClock *clock = GST_CLOCK (object); - - g_mutex_free (clock->active_mutex); - g_cond_free (clock->active_cond); + //GstClock *clock = GST_CLOCK (object); G_OBJECT_CLASS (parent_class)->dispose (object); } diff --git a/gst/gstclock.h b/gst/gstclock.h index 83eb1d9e76..c81f6777ef 100644 --- a/gst/gstclock.h +++ b/gst/gstclock.h @@ -114,7 +114,7 @@ struct _GstClockEntry { typedef enum { - GST_CLOCK_STOPPED = 0, + GST_CLOCK_UNSCHEDULED = 0, GST_CLOCK_TIMEOUT = 1, GST_CLOCK_EARLY = 2, GST_CLOCK_ERROR = 3, @@ -144,8 +144,6 @@ struct _GstClock { GstClockTime adjust; GstClockTime last_time; GList *entries; - GMutex *active_mutex; - GCond *active_cond; /*< private >*/ guint64 resolution; diff --git a/gst/gstevent.c b/gst/gstevent.c index 3d78dbc678..5d9a7d3662 100644 --- a/gst/gstevent.c +++ b/gst/gstevent.c @@ -262,14 +262,14 @@ gst_event_new_discontinuous_valist (gdouble rate, GstFormat format1, * Returns: A new discontinuous event. */ GstEvent * -gst_event_new_discontinuous (gboolean new_media, GstFormat format1, ...) +gst_event_new_discontinuous (gdouble rate, GstFormat format1, ...) { va_list var_args; GstEvent *event; va_start (var_args, format1); - event = gst_event_new_discontinuous_valist (new_media, format1, var_args); + event = gst_event_new_discontinuous_valist (rate, format1, var_args); va_end (var_args); @@ -359,3 +359,22 @@ gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 stop) return event; } + +/** + * gst_event_new_flush: + * @done: Indicates the end of the flush + * + * Allocate a new flush event. + * + * Returns: A new flush event. + */ +GstEvent * +gst_event_new_flush (gboolean done) +{ + GstEvent *event; + + event = gst_event_new (GST_EVENT_FLUSH); + GST_EVENT_FLUSH_DONE (event) = done; + + return event; +} diff --git a/gst/gstevent.h b/gst/gstevent.h index f064622fe5..58eb88af48 100644 --- a/gst/gstevent.h +++ b/gst/gstevent.h @@ -146,6 +146,8 @@ typedef struct #define GST_EVENT_DISCONT_OFFSET(event,i) (GST_EVENT(event)->event_data.discont.offsets[i]) #define GST_EVENT_DISCONT_OFFSET_LEN(event) (GST_EVENT(event)->event_data.discont.noffsets) +#define GST_EVENT_FLUSH_DONE(event) (GST_EVENT(event)->event_data.flush.done) + #define GST_EVENT_SIZE_FORMAT(event) (GST_EVENT(event)->event_data.size.format) #define GST_EVENT_SIZE_VALUE(event) (GST_EVENT(event)->event_data.size.value) @@ -171,6 +173,9 @@ struct _GstEvent { gint noffsets; gdouble rate; } discont; + struct { + gboolean done; + } flush; struct { GstFormat format; gint64 value; @@ -211,7 +216,7 @@ GstEvent* gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 sto GstEvent* gst_event_new_size (GstFormat format, gint64 value); /* discontinous event */ -GstEvent* gst_event_new_discontinuous (gboolean new_media, +GstEvent* gst_event_new_discontinuous (gdouble rate, GstFormat format1, ...); GstEvent* gst_event_new_discontinuous_valist (gdouble rate, GstFormat format1, @@ -222,7 +227,7 @@ gboolean gst_event_discont_get_value (GstEvent *event, GstFormat format, #define gst_event_new_filler() gst_event_new(GST_EVENT_FILLER) /* flush events */ -#define gst_event_new_flush() gst_event_new(GST_EVENT_FLUSH) +GstEvent* gst_event_new_flush (gboolean done); G_END_DECLS diff --git a/gst/gstpad.c b/gst/gstpad.c index 77533c60a0..f773777316 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -2760,6 +2760,9 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer) if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer))) goto not_active; + if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer))) + goto flushing; + gst_object_ref (GST_OBJECT_CAST (peer)); GST_UNLOCK (pad); @@ -2803,6 +2806,12 @@ not_active: "pushing, but it was inactive"); GST_UNLOCK_RETURN (pad, GST_FLOW_WRONG_STATE); } +flushing: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "pushing, but pad was flushing"); + GST_UNLOCK_RETURN (pad, GST_FLOW_UNEXPECTED); + } not_negotiated: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, @@ -3456,6 +3465,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) peerpad = GST_RPAD_PEER (pad); if (peerpad == NULL) goto not_linked; + gst_object_ref (GST_OBJECT_CAST (peerpad)); GST_UNLOCK (pad); @@ -3504,6 +3514,21 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) GST_CAT_DEBUG (GST_CAT_EVENT, "have event type %d on pad %s:%s", GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (rpad)); + if (GST_PAD_IS_SINK (pad)) { + if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH) { + GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event"); + GST_LOCK (pad); + if (GST_EVENT_FLUSH_DONE (event)) { + GST_CAT_DEBUG (GST_CAT_EVENT, "clear flush flag"); + GST_FLAG_UNSET (pad, GST_PAD_FLUSHING); + } else { + GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag"); + GST_FLAG_SET (pad, GST_PAD_FLUSHING); + } + GST_UNLOCK (pad); + } + } + if ((eventfunc = GST_RPAD_EVENTFUNC (rpad)) == NULL) goto no_function; diff --git a/gst/gstpad.h b/gst/gstpad.h index affbb3b438..73aacfff26 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -102,11 +102,11 @@ typedef enum { typedef enum { GST_FLOW_OK = 0, /* data passing was ok */ GST_FLOW_RESEND = 1, /* resend buffer, possibly with new caps */ - GST_FLOW_ERROR = -1, /* some error occured */ + GST_FLOW_ERROR = -1, /* some (fatal) error occured */ GST_FLOW_NOT_CONNECTED = -2, /* pad is not connected */ GST_FLOW_NOT_NEGOTIATED = -3, /* pad is not negotiated */ GST_FLOW_WRONG_STATE = -4, /* pad is in wrong state */ - GST_FLOW_UNEXPECTED = -5, /* did not expect anything */ + GST_FLOW_UNEXPECTED = -5, /* did not expect anything, this is not fatal */ GST_FLOW_NOT_SUPPORTED = -6 /* function not supported */ } GstFlowReturn; diff --git a/gst/gsttask.c b/gst/gsttask.c index be12028957..c66c9d3bd5 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -116,3 +116,19 @@ gst_task_stop (GstTask * task) return result; } + +gboolean +gst_task_pause (GstTask * task) +{ + GstTaskClass *tclass; + gboolean result = FALSE; + + g_return_val_if_fail (GST_IS_TASK (task), result); + + tclass = GST_TASK_GET_CLASS (task); + + if (tclass->pause) + result = tclass->pause (task); + + return result; +} diff --git a/gst/gsttask.h b/gst/gsttask.h index 429fd7606b..a14bdf61a5 100644 --- a/gst/gsttask.h +++ b/gst/gsttask.h @@ -49,6 +49,7 @@ struct _GstTaskClass { gboolean (*start) (GstTask *task); gboolean (*stop) (GstTask *task); + gboolean (*pause) (GstTask *task); gpointer _gst_reserved[GST_PADDING]; }; @@ -57,6 +58,7 @@ GType gst_task_get_type (void); gboolean gst_task_start (GstTask *task); gboolean gst_task_stop (GstTask *task); +gboolean gst_task_pause (GstTask *task); G_END_DECLS diff --git a/gst/schedulers/threadscheduler.c b/gst/schedulers/threadscheduler.c index f7e87c3e9b..7c6fd7e5f0 100644 --- a/gst/schedulers/threadscheduler.c +++ b/gst/schedulers/threadscheduler.c @@ -79,7 +79,8 @@ typedef struct _GstThreadSchedulerTaskClass GstThreadSchedulerTaskClass; typedef enum { STATE_STOPPED, - STATE_STARTED + STATE_STARTED, + STATE_PAUSED } TaskState; @@ -89,6 +90,7 @@ struct _GstThreadSchedulerTask TaskState state; GMutex *lock; + GCond *cond; GstTaskFunction func; gpointer data; @@ -106,6 +108,7 @@ static void gst_thread_scheduler_task_init (GstThreadSchedulerTask * object); static gboolean gst_thread_scheduler_task_start (GstTask * task); static gboolean gst_thread_scheduler_task_stop (GstTask * task); +static gboolean gst_thread_scheduler_task_pause (GstTask * task); GType gst_thread_scheduler_task_get_type (void) @@ -139,6 +142,7 @@ gst_thread_scheduler_task_class_init (gpointer klass, gpointer class_data) task->start = gst_thread_scheduler_task_start; task->stop = gst_thread_scheduler_task_stop; + task->pause = gst_thread_scheduler_task_pause; } static void @@ -146,6 +150,7 @@ gst_thread_scheduler_task_init (GstThreadSchedulerTask * task) { task->state = STATE_STOPPED; task->lock = g_mutex_new (); + task->cond = g_cond_new (); } static gboolean @@ -156,10 +161,12 @@ gst_thread_scheduler_task_start (GstTask * task) GST_THREAD_SCHEDULER (gst_object_get_parent (GST_OBJECT (task))); g_mutex_lock (ttask->lock); - if (ttask->state != STATE_STARTED) { + if (ttask->state == STATE_STOPPED) { ttask->state = STATE_STARTED; - g_thread_pool_push (tsched->pool, task, NULL); + } else { + ttask->state = STATE_STARTED; + g_cond_signal (ttask->cond); } g_mutex_unlock (ttask->lock); @@ -174,11 +181,24 @@ gst_thread_scheduler_task_stop (GstTask * task) g_mutex_lock (ttask->lock); if (ttask->state != STATE_STOPPED) { ttask->state = STATE_STOPPED; + g_cond_signal (ttask->cond); } g_mutex_unlock (ttask->lock); return TRUE; } +static gboolean +gst_thread_scheduler_task_pause (GstTask * task) +{ + GstThreadSchedulerTask *ttask = GST_THREAD_SCHEDULER_TASK (task); + + g_mutex_lock (ttask->lock); + if (ttask->state != STATE_PAUSED) { + ttask->state = STATE_PAUSED; + } + g_mutex_unlock (ttask->lock); + return TRUE; +} static void gst_thread_scheduler_class_init (gpointer g_class, gpointer data); static void gst_thread_scheduler_init (GstThreadScheduler * object); @@ -242,12 +262,20 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * task, GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task, g_thread_self ()); g_mutex_lock (task->lock); - while (G_LIKELY (task->state == STATE_STARTED)) { + while (G_LIKELY (task->state != STATE_STOPPED)) { + if (task->state == STATE_PAUSED) { + g_cond_wait (task->cond, task->lock); + if (task->state == STATE_STOPPED) + break; + } g_mutex_unlock (task->lock); + res = task->func (task->data); + g_mutex_lock (task->lock); - if (G_UNLIKELY (!res)) + if (G_UNLIKELY (!res)) { task->state = STATE_STOPPED; + } } g_mutex_unlock (task->lock); GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ()); diff --git a/plugins/elements/gstfilesrc.c b/plugins/elements/gstfilesrc.c index c8e375baa1..e4b44a62e1 100644 --- a/plugins/elements/gstfilesrc.c +++ b/plugins/elements/gstfilesrc.c @@ -719,7 +719,7 @@ gst_filesrc_get (GstPad * pad, GstBuffer ** buffer) if (src->need_flush) { src->need_flush = FALSE; GST_DEBUG_OBJECT (src, "sending flush"); - gst_pad_push_event (pad, gst_event_new_flush ()); + gst_pad_push_event (pad, gst_event_new_flush (TRUE)); } /* check for seek */ if (src->need_discont) {