rtspsrc: Rework the async state handling

Always send the flushing events to the udp elements now that basesrc supports
this. This makes sure a segment event is sent correctly after a flush.
Keep track of the currently executing command and make it possible to specify
what command you want to cancel when starting a new async command.

See https://bugzilla.gnome.org/show_bug.cgi?id=677905
This commit is contained in:
Wim Taymans 2012-06-12 16:05:40 +02:00
parent ea17c457f9
commit 935472aba7
2 changed files with 54 additions and 66 deletions

View file

@ -248,15 +248,14 @@ static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message);
static gboolean gst_rtspsrc_setup_auth (GstRTSPSrc * src,
GstRTSPMessage * response);
static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd);
static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gint mask);
static GstRTSPResult gst_rtspsrc_send_cb (GstRTSPExtension * ext,
GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPSrc * src);
static GstRTSPResult gst_rtspsrc_open (GstRTSPSrc * src, gboolean async);
static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment,
gboolean async);
static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle,
gboolean async);
static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async);
static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async,
gboolean only_close);
@ -266,18 +265,17 @@ static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler,
static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src);
static gboolean gst_rtspsrc_loop (GstRTSPSrc * src);
static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src,
GstRTSPStream * stream, GstEvent * event, gboolean source);
static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event,
gboolean source);
GstRTSPStream * stream, GstEvent * event);
static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event);
/* commands we send to out loop to notify it of events */
#define CMD_OPEN 0
#define CMD_PLAY 1
#define CMD_PAUSE 2
#define CMD_CLOSE 3
#define CMD_WAIT 4
#define CMD_RECONNECT 5
#define CMD_LOOP 6
#define CMD_OPEN (1 << 0)
#define CMD_PLAY (1 << 1)
#define CMD_PAUSE (1 << 2)
#define CMD_CLOSE (1 << 3)
#define CMD_WAIT (1 << 4)
#define CMD_RECONNECT (1 << 5)
#define CMD_LOOP (1 << 6)
#define GST_ELEMENT_PROGRESS(el, type, code, text) \
G_STMT_START { \
@ -1688,8 +1686,8 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
gst_object_unref (clock);
}
}
gst_rtspsrc_push_event (src, event, FALSE);
gst_rtspsrc_loop_send_cmd (src, cmd);
gst_rtspsrc_push_event (src, event);
gst_rtspsrc_loop_send_cmd (src, cmd, CMD_LOOP);
/* set up manager before data-flow resumes */
/* to manage jitterbuffer buffer mode */
@ -1866,7 +1864,7 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event)
if (playing) {
/* obtain current position in case seek fails */
gst_rtspsrc_get_position (src);
gst_rtspsrc_pause (src, FALSE, FALSE);
gst_rtspsrc_pause (src, FALSE);
}
src->skip = skip;
@ -2291,7 +2289,7 @@ gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, GstRTSPStream * stream)
goto was_eos;
stream->eos = TRUE;
gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos (), TRUE);
gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos ());
return;
/* ERRORS */
@ -3215,7 +3213,7 @@ done:
static gboolean
gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
GstEvent * event, gboolean source)
GstEvent * event)
{
gboolean res = TRUE;
@ -3223,7 +3221,7 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
if (stream->srcpad == NULL)
goto done;
if (source && stream->udpsrc[0]) {
if (stream->udpsrc[0]) {
gst_event_ref (event);
res = gst_element_send_event (stream->udpsrc[0], event);
} else if (stream->channelpad[0]) {
@ -3234,7 +3232,7 @@ gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
res = gst_pad_send_event (stream->channelpad[0], event);
}
if (source && stream->udpsrc[1]) {
if (stream->udpsrc[1]) {
gst_event_ref (event);
res &= gst_element_send_event (stream->udpsrc[1], event);
} else if (stream->channelpad[1]) {
@ -3252,7 +3250,7 @@ done:
}
static gboolean
gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source)
gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event)
{
GList *streams;
gboolean res = TRUE;
@ -3261,7 +3259,7 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source)
GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
gst_event_ref (event);
res &= gst_rtspsrc_stream_push_event (src, ostream, event, source);
res &= gst_rtspsrc_stream_push_event (src, ostream, event);
}
gst_event_unref (event);
@ -4067,7 +4065,7 @@ gst_rtspsrc_loop_end_cmd (GstRTSPSrc * src, gint cmd, GstRTSPResult ret)
}
static void
gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd)
gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gint mask)
{
gint old;
@ -4077,19 +4075,21 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd)
GST_DEBUG_OBJECT (src, "sending cmd %d", cmd);
GST_OBJECT_LOCK (src);
old = src->loop_cmd;
old = src->pending_cmd;
if (old != CMD_WAIT) {
src->loop_cmd = CMD_WAIT;
src->pending_cmd = CMD_WAIT;
GST_OBJECT_UNLOCK (src);
/* cancel previous request */
gst_rtspsrc_loop_cancel_cmd (src, old);
GST_OBJECT_LOCK (src);
}
src->loop_cmd = cmd;
src->pending_cmd = cmd;
/* interrupt if allowed */
if (src->waiting) {
GST_DEBUG_OBJECT (src, "start connection flush");
if (src->busy_cmd & mask) {
GST_DEBUG_OBJECT (src, "connection flush busy %d", src->busy_cmd);
gst_rtspsrc_connection_flush (src, TRUE);
} else {
GST_DEBUG_OBJECT (src, "not interrupting busy cmd %d", src->busy_cmd);
}
if (src->task)
gst_task_start (src->task);
@ -4134,7 +4134,7 @@ pause:
gst_message_new_segment_done (GST_OBJECT_CAST (src),
src->segment.format, src->segment.position));
} else {
gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE);
gst_rtspsrc_push_event (src, gst_event_new_eos ());
}
} else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
/* for fatal errors we post an error message, post the error before the
@ -4142,7 +4142,7 @@ pause:
GST_ELEMENT_ERROR (src, STREAM, FAILED,
("Internal data flow error."),
("streaming task paused, reason %s (%d)", reason, ret));
gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE);
gst_rtspsrc_push_event (src, gst_event_new_eos ());
}
return FALSE;
}
@ -6252,7 +6252,7 @@ send_error:
}
static GstRTSPResult
gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async)
gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
{
GstRTSPResult res = GST_RTSP_OK;
GstRTSPMessage request = { 0 };
@ -6399,7 +6399,7 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
/* we only act on the first udp timeout message, others are irrelevant
* and can be ignored. */
if (!ignore_timeout)
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT);
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT, CMD_LOOP);
/* eat and free */
gst_message_unref (message);
return;
@ -6456,46 +6456,36 @@ static void
gst_rtspsrc_thread (GstRTSPSrc * src)
{
gint cmd;
GstRTSPResult ret;
gboolean running = FALSE;
GST_OBJECT_LOCK (src);
cmd = src->loop_cmd;
src->loop_cmd = CMD_WAIT;
cmd = src->pending_cmd;
src->pending_cmd = CMD_WAIT;
GST_DEBUG_OBJECT (src, "got command %d", cmd);
/* we got the message command, so ensure communication is possible again */
gst_rtspsrc_connection_flush (src, FALSE);
/* we allow these to be interrupted */
if (cmd == CMD_LOOP || cmd == CMD_CLOSE || cmd == CMD_PAUSE)
src->waiting = TRUE;
src->busy_cmd = cmd;
GST_OBJECT_UNLOCK (src);
switch (cmd) {
case CMD_OPEN:
ret = gst_rtspsrc_open (src, TRUE);
gst_rtspsrc_open (src, TRUE);
break;
case CMD_PLAY:
ret = gst_rtspsrc_play (src, &src->segment, TRUE);
if (ret == GST_RTSP_OK)
running = TRUE;
gst_rtspsrc_play (src, &src->segment, TRUE);
break;
case CMD_PAUSE:
ret = gst_rtspsrc_pause (src, TRUE, TRUE);
if (ret == GST_RTSP_OK)
running = TRUE;
gst_rtspsrc_pause (src, TRUE);
break;
case CMD_CLOSE:
ret = gst_rtspsrc_close (src, TRUE, FALSE);
gst_rtspsrc_close (src, TRUE, FALSE);
break;
case CMD_LOOP:
running = gst_rtspsrc_loop (src);
gst_rtspsrc_loop (src);
break;
case CMD_RECONNECT:
ret = gst_rtspsrc_reconnect (src, FALSE);
if (ret == GST_RTSP_OK)
running = TRUE;
gst_rtspsrc_reconnect (src, FALSE);
break;
default:
break;
@ -6503,14 +6493,12 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
GST_OBJECT_LOCK (src);
/* and go back to sleep */
if (src->loop_cmd == CMD_WAIT) {
if (running)
src->loop_cmd = CMD_LOOP;
else if (src->task)
if (src->pending_cmd == CMD_WAIT) {
if (src->task)
gst_task_pause (src->task);
}
/* reset waiting */
src->waiting = FALSE;
src->busy_cmd = CMD_WAIT;
GST_OBJECT_UNLOCK (src);
}
@ -6521,7 +6509,7 @@ gst_rtspsrc_start (GstRTSPSrc * src)
GST_OBJECT_LOCK (src);
src->loop_cmd = CMD_WAIT;
src->pending_cmd = CMD_WAIT;
if (src->task == NULL) {
src->task = gst_task_new ((GstTaskFunction) gst_rtspsrc_thread, src);
@ -6550,7 +6538,7 @@ gst_rtspsrc_stop (GstRTSPSrc * src)
GST_DEBUG_OBJECT (src, "stopping");
/* also cancels pending task */
gst_rtspsrc_loop_send_cmd (src, CMD_WAIT);
gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, CMD_CLOSE);
GST_OBJECT_LOCK (src);
if ((task = src->task)) {
@ -6598,12 +6586,12 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
/* first attempt, don't ignore timeouts */
rtspsrc->ignore_timeout = FALSE;
rtspsrc->open_error = FALSE;
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_OPEN);
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_OPEN, 0);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
/* unblock the tcp tasks and make the loop waiting */
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_WAIT);
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_WAIT, CMD_LOOP);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
break;
@ -6617,18 +6605,18 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PLAY);
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PLAY, 0);
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
/* send pause request and keep the idle task around */
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PAUSE);
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PAUSE, CMD_LOOP);
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE);
gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE, CMD_PAUSE);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
gst_rtspsrc_stop (rtspsrc);
@ -6656,7 +6644,7 @@ gst_rtspsrc_send_event (GstElement * element, GstEvent * event)
rtspsrc = GST_RTSPSRC (element);
if (GST_EVENT_IS_DOWNSTREAM (event)) {
res = gst_rtspsrc_push_event (rtspsrc, event, TRUE);
res = gst_rtspsrc_push_event (rtspsrc, event);
} else {
res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
}

View file

@ -178,9 +178,9 @@ struct _GstRTSPSrc {
GstClockTime base_time;
/* UDP mode loop */
gint loop_cmd;
gint pending_cmd;
gint busy_cmd;
gboolean ignore_timeout;
gboolean waiting;
gboolean open_error;
/* mutex for protecting state changes */