[MOVED FROM GST-P-FARSIGHT] Implement stopping in a nice thread safe way

20070914042042-3e2dc-1fe257ff4b72aca4b0eb5f285a14650b8df268c3.gz
This commit is contained in:
Olivier Crete 2007-09-14 04:20:42 +00:00 committed by Edward Hervey
parent f518316253
commit 7efc94bc12
2 changed files with 131 additions and 68 deletions

View file

@ -252,6 +252,7 @@ static void gst_dtmf_src_add_stop_event (GstDTMFSrc *dtmfsrc);
static gboolean gst_dtmf_src_unlock (GstBaseSrc *src); static gboolean gst_dtmf_src_unlock (GstBaseSrc *src);
static gboolean gst_dtmf_src_unlock_stop (GstBaseSrc *src);
static void static void
gst_dtmf_src_base_init (gpointer g_class) gst_dtmf_src_base_init (gpointer g_class)
@ -294,6 +295,8 @@ gst_dtmf_src_class_init (GstDTMFSrcClass * klass)
GST_DEBUG_FUNCPTR (gst_dtmf_src_change_state); GST_DEBUG_FUNCPTR (gst_dtmf_src_change_state);
gstbasesrc_class->unlock = gstbasesrc_class->unlock =
GST_DEBUG_FUNCPTR (gst_dtmf_src_unlock); GST_DEBUG_FUNCPTR (gst_dtmf_src_unlock);
gstbasesrc_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_dtmf_src_unlock_stop);
gstbasesrc_class->event = gstbasesrc_class->event =
GST_DEBUG_FUNCPTR (gst_dtmf_src_handle_event); GST_DEBUG_FUNCPTR (gst_dtmf_src_handle_event);
@ -315,8 +318,6 @@ gst_dtmf_src_init (GstDTMFSrc * dtmfsrc, GstDTMFSrcClass *g_class)
dtmfsrc->event_queue = g_async_queue_new (); dtmfsrc->event_queue = g_async_queue_new ();
dtmfsrc->last_event = NULL; dtmfsrc->last_event = NULL;
dtmfsrc->clock_id = NULL;
GST_DEBUG_OBJECT (dtmfsrc, "init done"); GST_DEBUG_OBJECT (dtmfsrc, "init done");
} }
@ -475,7 +476,7 @@ gst_dtmf_prepare_timestamps (GstDTMFSrc *dtmfsrc)
GstClock *clock; GstClock *clock;
GstClockTime base_time; GstClockTime base_time;
base_time = GST_ELEMENT_CAST (dtmfsrc)->base_time; base_time = gst_element_get_base_time (GST_ELEMENT (dtmfsrc));
clock = gst_element_get_clock (GST_ELEMENT (dtmfsrc)); clock = gst_element_get_clock (GST_ELEMENT (dtmfsrc));
if (clock != NULL) { if (clock != NULL) {
@ -622,15 +623,19 @@ gst_dtmf_src_create (GstBaseSrc * basesrc, guint64 offset,
guint length, GstBuffer ** buffer) guint length, GstBuffer ** buffer)
{ {
GstBuffer *buf = NULL; GstBuffer *buf = NULL;
GstFlowReturn ret;
GstDTMFSrcEvent *event; GstDTMFSrcEvent *event;
GstDTMFSrc * dtmfsrc; GstDTMFSrc * dtmfsrc;
GstClock *clock;
GstClockID *clockid;
GstClockReturn clockret;
dtmfsrc = GST_DTMF_SRC (basesrc); dtmfsrc = GST_DTMF_SRC (basesrc);
g_async_queue_ref (dtmfsrc->event_queue); do {
if (dtmfsrc->paused)
goto paused;
start:
if (dtmfsrc->last_event == NULL) { if (dtmfsrc->last_event == NULL) {
GST_DEBUG_OBJECT (dtmfsrc, "popping"); GST_DEBUG_OBJECT (dtmfsrc, "popping");
event = g_async_queue_pop (dtmfsrc->event_queue); event = g_async_queue_pop (dtmfsrc->event_queue);
@ -654,8 +659,11 @@ gst_dtmf_src_create (GstBaseSrc * basesrc, guint64 offset,
* the task is really paused (and the queue will then be flushed) * the task is really paused (and the queue will then be flushed)
*/ */
GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task..."); GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task...");
if (dtmfsrc->paused) {
g_async_queue_push (dtmfsrc->event_queue, event); g_async_queue_push (dtmfsrc->event_queue, event);
g_async_queue_unref (dtmfsrc->event_queue); goto paused;
}
} }
} else if (dtmfsrc->last_event->packet_count * dtmfsrc->interval >= } else if (dtmfsrc->last_event->packet_count * dtmfsrc->interval >=
MIN_DUTY_CYCLE) { MIN_DUTY_CYCLE) {
@ -667,37 +675,72 @@ gst_dtmf_src_create (GstBaseSrc * basesrc, guint64 offset,
"Received two consecutive DTMF start events"); "Received two consecutive DTMF start events");
} else if (event->event_type == DTMF_EVENT_TYPE_STOP) { } else if (event->event_type == DTMF_EVENT_TYPE_STOP) {
gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE); gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE);
g_free (dtmfsrc->last_event); g_free (dtmfsrc->last_event);
dtmfsrc->last_event = NULL; dtmfsrc->last_event = NULL;
goto start;
} else if (event->event_type == DTMF_EVENT_TYPE_PAUSE_TASK) { } else if (event->event_type == DTMF_EVENT_TYPE_PAUSE_TASK) {
/* /*
* We're pushing it back because it has to stay in there until * We're pushing it back because it has to stay in there until
* the task is really paused (and the queue will then be flushed) * the task is really paused (and the queue will then be flushed)
*/ */
GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task..."); GST_DEBUG_OBJECT (dtmfsrc, "pushing pause_task...");
if (dtmfsrc->paused) {
g_async_queue_push (dtmfsrc->event_queue, event); g_async_queue_push (dtmfsrc->event_queue, event);
g_async_queue_unref (dtmfsrc->event_queue); goto paused;
} }
} }
} }
g_async_queue_unref (dtmfsrc->event_queue); }
} while (dtmfsrc->last_event == NULL);
GST_DEBUG_OBJECT (dtmfsrc, "end event check"); GST_DEBUG_OBJECT (dtmfsrc, "end event check, now wait for the proper time");
clock = gst_element_get_clock (GST_ELEMENT (basesrc));
clockid = gst_clock_new_single_shot_id (clock, dtmfsrc->timestamp +
gst_element_get_base_time (GST_ELEMENT (dtmfsrc)));
gst_object_unref (clock);
GST_OBJECT_LOCK (dtmfsrc);
if (!dtmfsrc->paused) {
dtmfsrc->clockid = clockid;
GST_OBJECT_UNLOCK (dtmfsrc);
clockret = gst_clock_id_wait (clockid, NULL);
GST_OBJECT_LOCK (dtmfsrc);
if (dtmfsrc->paused)
clockret = GST_CLOCK_UNSCHEDULED;
} else {
clockret = GST_CLOCK_UNSCHEDULED;
}
gst_clock_id_unref (clockid);
dtmfsrc->clockid = NULL;
GST_OBJECT_UNLOCK (dtmfsrc);
if (clockret == GST_CLOCK_UNSCHEDULED) {
goto paused;
}
if (dtmfsrc->last_event) {
buf = gst_dtmf_src_create_next_tone_packet (dtmfsrc, dtmfsrc->last_event); buf = gst_dtmf_src_create_next_tone_packet (dtmfsrc, dtmfsrc->last_event);
GST_DEBUG_OBJECT (dtmfsrc, "Created buffer of size %d", GST_BUFFER_SIZE (buf)); GST_DEBUG_OBJECT (dtmfsrc, "Created buffer of size %d", GST_BUFFER_SIZE (buf));
*buffer = buf; *buffer = buf;
ret = GST_FLOW_OK;
} else { GST_DEBUG_OBJECT (dtmfsrc, "returning a buffer");
*buffer = NULL; return GST_FLOW_OK;
ret = GST_FLOW_WRONG_STATE;
paused:
if (dtmfsrc->last_event) {
GST_DEBUG_OBJECT (dtmfsrc, "Stopping current event");
/* Don't forget to release the stream lock */
gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE);
g_free (dtmfsrc->last_event);
dtmfsrc->last_event = NULL;
} }
GST_DEBUG_OBJECT (dtmfsrc, "returning"); return GST_FLOW_WRONG_STATE;
return ret;
} }
@ -706,7 +749,16 @@ gst_dtmf_src_unlock (GstBaseSrc *src) {
GstDTMFSrc *dtmfsrc = GST_DTMF_SRC (src); GstDTMFSrc *dtmfsrc = GST_DTMF_SRC (src);
GstDTMFSrcEvent *event = NULL; GstDTMFSrcEvent *event = NULL;
GST_DEBUG_OBJECT (dtmfsrc, "Pushing the PAUSE_TASK even on PAUSED_TO_READY change"); GST_DEBUG_OBJECT (dtmfsrc, "Called unlock");
GST_OBJECT_LOCK (dtmfsrc);
dtmfsrc->paused = TRUE;
if (dtmfsrc->clockid) {
gst_clock_id_unschedule (dtmfsrc->clockid);
}
GST_OBJECT_UNLOCK (dtmfsrc);
GST_DEBUG_OBJECT (dtmfsrc, "Pushing the PAUSE_TASK event on unlock request");
event = g_malloc (sizeof(GstDTMFSrcEvent)); event = g_malloc (sizeof(GstDTMFSrcEvent));
event->event_type = DTMF_EVENT_TYPE_PAUSE_TASK; event->event_type = DTMF_EVENT_TYPE_PAUSE_TASK;
g_async_queue_push (dtmfsrc->event_queue, event); g_async_queue_push (dtmfsrc->event_queue, event);
@ -714,6 +766,20 @@ gst_dtmf_src_unlock (GstBaseSrc *src) {
return TRUE; return TRUE;
} }
static gboolean
gst_dtmf_src_unlock_stop (GstBaseSrc *src) {
GstDTMFSrc *dtmfsrc = GST_DTMF_SRC (src);
GST_DEBUG_OBJECT (dtmfsrc, "Unlock stopped");
GST_OBJECT_LOCK (dtmfsrc);
dtmfsrc->paused = FALSE;
GST_OBJECT_UNLOCK (dtmfsrc);
return TRUE;
}
static GstStateChangeReturn static GstStateChangeReturn
gst_dtmf_src_change_state (GstElement * element, GstStateChange transition) gst_dtmf_src_change_state (GstElement * element, GstStateChange transition)
{ {
@ -726,7 +792,6 @@ gst_dtmf_src_change_state (GstElement * element, GstStateChange transition)
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
/* Flushing the event queue */ /* Flushing the event queue */
event = g_async_queue_try_pop (dtmfsrc->event_queue); event = g_async_queue_try_pop (dtmfsrc->event_queue);
@ -734,6 +799,7 @@ gst_dtmf_src_change_state (GstElement * element, GstStateChange transition)
g_free (event); g_free (event);
event = g_async_queue_try_pop (dtmfsrc->event_queue); event = g_async_queue_try_pop (dtmfsrc->event_queue);
} }
no_preroll = TRUE;
break; break;
default: default:
break; break;
@ -746,16 +812,12 @@ gst_dtmf_src_change_state (GstElement * element, GstStateChange transition)
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_PLAYING_TO_PAUSED: case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (dtmfsrc, "PLAYING TO PAUSED"); GST_DEBUG_OBJECT (dtmfsrc, "PLAYING TO PAUSED");
if (dtmfsrc->last_event) { no_preroll = TRUE;
GST_DEBUG_OBJECT (dtmfsrc, "Stopping current event"); break;
/* Don't forget to release the stream lock */ case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_dtmf_src_set_stream_lock (dtmfsrc, FALSE);
g_free (dtmfsrc->last_event);
dtmfsrc->last_event = NULL;
}
GST_DEBUG_OBJECT (dtmfsrc, "Flushing event queue"); GST_DEBUG_OBJECT (dtmfsrc, "Flushing event queue");
/* Flushing the event queue */ /* Flushing the event queue */
event = g_async_queue_try_pop (dtmfsrc->event_queue); event = g_async_queue_try_pop (dtmfsrc->event_queue);

View file

@ -73,11 +73,12 @@ struct _GstDTMFSrc {
GstBaseSrc parent; GstBaseSrc parent;
GAsyncQueue* event_queue; GAsyncQueue* event_queue;
GstDTMFSrcEvent* last_event; GstDTMFSrcEvent* last_event;
GstClockID clock_id;
gboolean task_paused;
guint16 interval; guint16 interval;
GstClockTime timestamp; GstClockTime timestamp;
gboolean paused;
GstClockID clockid;
}; };