From 220e47adcf3c6471bbcb17e85b9fa124ba35ae8b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 13:43:06 +0100 Subject: [PATCH] rtspsrc: implement more async handling Remove some old locks. Make sure we never go into the loop function when flushing. --- gst/rtsp/gstrtspsrc.c | 111 +++++++++++++++++++++++------------------- gst/rtsp/gstrtspsrc.h | 9 +--- 2 files changed, 63 insertions(+), 57 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 404a362152..b2c1c42bbb 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -270,8 +270,7 @@ static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, #define CMD_CLOSE 3 #define CMD_WAIT 4 #define CMD_RECONNECT 5 -#define CMD_STOP 6 -#define CMD_LOOP 7 +#define CMD_LOOP 6 #define GST_ELEMENT_PROGRESS(el, type, code, text) \ G_STMT_START { \ @@ -527,10 +526,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class) src->state_rec_lock = g_new (GStaticRecMutex, 1); g_static_rec_mutex_init (src->state_rec_lock); - /* protects access to the server connection */ - src->conn_rec_lock = g_new (GStaticRecMutex, 1); - g_static_rec_mutex_init (src->conn_rec_lock); - src->state = GST_RTSP_STATE_INVALID; } @@ -558,8 +553,6 @@ gst_rtspsrc_finalize (GObject * object) g_free (rtspsrc->stream_rec_lock); g_static_rec_mutex_free (rtspsrc->state_rec_lock); g_free (rtspsrc->state_rec_lock); - g_static_rec_mutex_free (rtspsrc->conn_rec_lock); - g_free (rtspsrc->conn_rec_lock); #ifdef G_OS_WIN32 WSACleanup (); @@ -1668,7 +1661,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush) if (flush) { event = gst_event_new_flush_start (); GST_DEBUG_OBJECT (src, "start flush"); - cmd = CMD_STOP; + cmd = CMD_WAIT; state = GST_STATE_PAUSED; } else { event = gst_event_new_flush_stop (); @@ -1718,12 +1711,10 @@ gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn, { GstRTSPResult ret; - GST_RTSP_CONN_LOCK (src); if (conn) ret = gst_rtsp_connection_send (conn, message, timeout); else ret = GST_RTSP_ERROR; - GST_RTSP_CONN_UNLOCK (src); return ret; } @@ -1734,12 +1725,10 @@ gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn, { GstRTSPResult ret; - GST_RTSP_CONN_LOCK (src); if (conn) ret = gst_rtsp_connection_receive (conn, message, timeout); else ret = GST_RTSP_ERROR; - GST_RTSP_CONN_UNLOCK (src); return ret; } @@ -3537,8 +3526,10 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) /* see if the timeout period expired */ if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) { GST_DEBUG_OBJECT (src, "timout, sending keep-alive"); - /* send keep-alive, ignore the result, a warning will be posted. */ - gst_rtspsrc_send_keep_alive (src); + /* send keep-alive, only act on interrupt, a warning will be posted for + * other errors. */ + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; /* get new timeout */ gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); } @@ -3546,11 +3537,23 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec", tv_timeout.tv_sec, tv_timeout.tv_usec); - /* protect the connection with the connection lock so that we can see when - * we are finished doing server communication */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, &message, - src->ptcp_timeout); + GST_OBJECT_LOCK (src); + if (src->loop_cmd == CMD_LOOP && !src->flushing) { + src->waiting = TRUE; + GST_OBJECT_UNLOCK (src); + + /* protect the connection with the connection lock so that we can see when + * we are finished doing server communication */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, src->ptcp_timeout); + + GST_OBJECT_LOCK (src); + src->waiting = FALSE; + } else { + res = GST_RTSP_EINTR; + } + GST_OBJECT_UNLOCK (src); switch (res) { case GST_RTSP_OK: @@ -3562,7 +3565,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) case GST_RTSP_ETIMEOUT: /* no reply, send keep alive */ GST_DEBUG_OBJECT (src, "timeout, sending keep-alive"); - gst_rtspsrc_send_keep_alive (src); + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; continue; case GST_RTSP_EEOF: /* go EOS when the server closed the connection */ @@ -3779,12 +3783,25 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) (gint) tv_timeout.tv_sec); gst_rtsp_message_unset (&message); - /* we should continue reading the TCP socket because the server might - * send us requests. When the session timeout expires, we need to send a - * keep-alive request to keep the session open. */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, &tv_timeout); + + GST_OBJECT_LOCK (src); + if (src->loop_cmd == CMD_LOOP && !src->flushing) { + src->waiting = TRUE; + GST_OBJECT_UNLOCK (src); + + /* we should continue reading the TCP socket because the server might + * send us requests. When the session timeout expires, we need to send a + * keep-alive request to keep the session open. */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, &tv_timeout); + + GST_OBJECT_LOCK (src); + src->waiting = FALSE; + } else { + res = GST_RTSP_EINTR; + } + GST_OBJECT_UNLOCK (src); switch (res) { case GST_RTSP_OK: @@ -3797,7 +3814,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) case GST_RTSP_ETIMEOUT: /* send keep-alive, ignore the result, a warning will be posted. */ GST_DEBUG_OBJECT (src, "timeout, sending keep-alive"); - gst_rtspsrc_send_keep_alive (src); + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; continue; case GST_RTSP_EEOF: /* server closed the connection. not very fatal for UDP, reconnect and @@ -3832,7 +3850,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "but is Unauthorized response ..."); if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) { GST_DEBUG_OBJECT (src, "so retrying keep-alive"); - gst_rtspsrc_send_keep_alive (src); + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; } } else { retry = 0; @@ -3857,7 +3876,6 @@ interrupt: if (src->loop_cmd != CMD_RECONNECT) goto stopping; - /* when we get here we have to reconnect using tcp */ src->loop_cmd = CMD_LOOP; @@ -3974,12 +3992,14 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) { GST_OBJECT_LOCK (src); src->loop_cmd = cmd; + src->flushing = flush; if (flush) { - GST_DEBUG_OBJECT (src, "start connection flush"); - gst_rtspsrc_connection_flush (src, TRUE); + if (src->waiting) { + GST_DEBUG_OBJECT (src, "start connection flush"); + gst_rtspsrc_connection_flush (src, TRUE); + } } else { GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtspsrc_connection_flush (src, FALSE); } if (src->task) gst_task_start (src->task); @@ -5862,13 +5882,6 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) if (!src->conninfo.connection || !src->conninfo.connected) goto done; - /* waiting for connection idle, we were flushing so any attempt at doing data - * transfer will result in pausing the tasks. */ - GST_DEBUG_OBJECT (src, "wait for connection idle"); - GST_RTSP_CONN_LOCK (src); - GST_DEBUG_OBJECT (src, "connection is idle now"); - GST_RTSP_CONN_UNLOCK (src); - /* send some dummy packets before we activate the receive in the * udp sources */ gst_rtspsrc_send_dummy_packets (src); @@ -6072,13 +6085,6 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) if (src->state == GST_RTSP_STATE_READY) goto was_paused; - /* waiting for connection idle, we were flushing so any attempt at doing data - * transfer will result in pausing the tasks. */ - GST_DEBUG_OBJECT (src, "wait for connection idle"); - GST_RTSP_CONN_LOCK (src); - GST_DEBUG_OBJECT (src, "connection is idle now"); - GST_RTSP_CONN_UNLOCK (src); - if (!src->conninfo.connection || !src->conninfo.connected) goto no_connection; @@ -6272,7 +6278,8 @@ gst_rtspsrc_thread (GstRTSPSrc * src) GST_OBJECT_LOCK (src); cmd = src->loop_cmd; - src->loop_cmd = CMD_WAIT; + if (cmd != CMD_LOOP || src->flushing) + src->loop_cmd = CMD_WAIT; GST_DEBUG_OBJECT (src, "got command %d", cmd); GST_OBJECT_UNLOCK (src); @@ -6301,8 +6308,12 @@ gst_rtspsrc_thread (GstRTSPSrc * src) GST_OBJECT_LOCK (src); /* and go back to sleep */ - if (!running && src->loop_cmd == CMD_WAIT && src->task) - gst_task_pause (src->task); + if (src->loop_cmd == CMD_WAIT) { + if (running) + src->loop_cmd = CMD_LOOP; + else if (src->task) + gst_task_pause (src->task); + } GST_OBJECT_UNLOCK (src); } diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index c09442bc24..b5744248cc 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -79,10 +79,6 @@ typedef struct _GstRTSPSrcClass GstRTSPSrcClass; #define GST_RTSP_STREAM_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp))) #define GST_RTSP_STREAM_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp))) -#define GST_RTSP_CONN_GET_LOCK(rtsp) (GST_RTSPSRC_CAST(rtsp)->conn_rec_lock) -#define GST_RTSP_CONN_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_CONN_GET_LOCK(rtsp))) -#define GST_RTSP_CONN_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_CONN_GET_LOCK(rtsp))) - typedef struct _GstRTSPConnInfo GstRTSPConnInfo; struct _GstRTSPConnInfo { @@ -184,13 +180,12 @@ struct _GstRTSPSrc { /* UDP mode loop */ gint loop_cmd; gboolean ignore_timeout; + gboolean flushing; + gboolean waiting; /* mutex for protecting state changes */ GStaticRecMutex *state_rec_lock; - /* mutex for protecting the connection */ - GStaticRecMutex *conn_rec_lock; - GstSDPMessage *sdp; gboolean from_sdp; gint numstreams;