mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-06-05 06:58:56 +00:00
rtspsrc: implement more async handling
Remove some old locks. Make sure we never go into the loop function when flushing.
This commit is contained in:
parent
2873585238
commit
220e47adcf
2 changed files with 63 additions and 57 deletions
|
@ -270,8 +270,7 @@ static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event,
|
||||||
#define CMD_CLOSE 3
|
#define CMD_CLOSE 3
|
||||||
#define CMD_WAIT 4
|
#define CMD_WAIT 4
|
||||||
#define CMD_RECONNECT 5
|
#define CMD_RECONNECT 5
|
||||||
#define CMD_STOP 6
|
#define CMD_LOOP 6
|
||||||
#define CMD_LOOP 7
|
|
||||||
|
|
||||||
#define GST_ELEMENT_PROGRESS(el, type, code, text) \
|
#define GST_ELEMENT_PROGRESS(el, type, code, text) \
|
||||||
G_STMT_START { \
|
G_STMT_START { \
|
||||||
|
@ -527,10 +526,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class)
|
||||||
src->state_rec_lock = g_new (GStaticRecMutex, 1);
|
src->state_rec_lock = g_new (GStaticRecMutex, 1);
|
||||||
g_static_rec_mutex_init (src->state_rec_lock);
|
g_static_rec_mutex_init (src->state_rec_lock);
|
||||||
|
|
||||||
/* protects access to the server connection */
|
|
||||||
src->conn_rec_lock = g_new (GStaticRecMutex, 1);
|
|
||||||
g_static_rec_mutex_init (src->conn_rec_lock);
|
|
||||||
|
|
||||||
src->state = GST_RTSP_STATE_INVALID;
|
src->state = GST_RTSP_STATE_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,8 +553,6 @@ gst_rtspsrc_finalize (GObject * object)
|
||||||
g_free (rtspsrc->stream_rec_lock);
|
g_free (rtspsrc->stream_rec_lock);
|
||||||
g_static_rec_mutex_free (rtspsrc->state_rec_lock);
|
g_static_rec_mutex_free (rtspsrc->state_rec_lock);
|
||||||
g_free (rtspsrc->state_rec_lock);
|
g_free (rtspsrc->state_rec_lock);
|
||||||
g_static_rec_mutex_free (rtspsrc->conn_rec_lock);
|
|
||||||
g_free (rtspsrc->conn_rec_lock);
|
|
||||||
|
|
||||||
#ifdef G_OS_WIN32
|
#ifdef G_OS_WIN32
|
||||||
WSACleanup ();
|
WSACleanup ();
|
||||||
|
@ -1668,7 +1661,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush)
|
||||||
if (flush) {
|
if (flush) {
|
||||||
event = gst_event_new_flush_start ();
|
event = gst_event_new_flush_start ();
|
||||||
GST_DEBUG_OBJECT (src, "start flush");
|
GST_DEBUG_OBJECT (src, "start flush");
|
||||||
cmd = CMD_STOP;
|
cmd = CMD_WAIT;
|
||||||
state = GST_STATE_PAUSED;
|
state = GST_STATE_PAUSED;
|
||||||
} else {
|
} else {
|
||||||
event = gst_event_new_flush_stop ();
|
event = gst_event_new_flush_stop ();
|
||||||
|
@ -1718,12 +1711,10 @@ gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn,
|
||||||
{
|
{
|
||||||
GstRTSPResult ret;
|
GstRTSPResult ret;
|
||||||
|
|
||||||
GST_RTSP_CONN_LOCK (src);
|
|
||||||
if (conn)
|
if (conn)
|
||||||
ret = gst_rtsp_connection_send (conn, message, timeout);
|
ret = gst_rtsp_connection_send (conn, message, timeout);
|
||||||
else
|
else
|
||||||
ret = GST_RTSP_ERROR;
|
ret = GST_RTSP_ERROR;
|
||||||
GST_RTSP_CONN_UNLOCK (src);
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -1734,12 +1725,10 @@ gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn,
|
||||||
{
|
{
|
||||||
GstRTSPResult ret;
|
GstRTSPResult ret;
|
||||||
|
|
||||||
GST_RTSP_CONN_LOCK (src);
|
|
||||||
if (conn)
|
if (conn)
|
||||||
ret = gst_rtsp_connection_receive (conn, message, timeout);
|
ret = gst_rtsp_connection_receive (conn, message, timeout);
|
||||||
else
|
else
|
||||||
ret = GST_RTSP_ERROR;
|
ret = GST_RTSP_ERROR;
|
||||||
GST_RTSP_CONN_UNLOCK (src);
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -3537,8 +3526,10 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
|
||||||
/* see if the timeout period expired */
|
/* see if the timeout period expired */
|
||||||
if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) {
|
if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) {
|
||||||
GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
|
GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
|
||||||
/* send keep-alive, ignore the result, a warning will be posted. */
|
/* send keep-alive, only act on interrupt, a warning will be posted for
|
||||||
gst_rtspsrc_send_keep_alive (src);
|
* other errors. */
|
||||||
|
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
|
||||||
|
goto interrupt;
|
||||||
/* get new timeout */
|
/* get new timeout */
|
||||||
gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
|
gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
|
||||||
}
|
}
|
||||||
|
@ -3546,11 +3537,23 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
|
||||||
GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
|
GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
|
||||||
tv_timeout.tv_sec, tv_timeout.tv_usec);
|
tv_timeout.tv_sec, tv_timeout.tv_usec);
|
||||||
|
|
||||||
/* protect the connection with the connection lock so that we can see when
|
GST_OBJECT_LOCK (src);
|
||||||
* we are finished doing server communication */
|
if (src->loop_cmd == CMD_LOOP && !src->flushing) {
|
||||||
res =
|
src->waiting = TRUE;
|
||||||
gst_rtspsrc_connection_receive (src, src->conninfo.connection, &message,
|
GST_OBJECT_UNLOCK (src);
|
||||||
src->ptcp_timeout);
|
|
||||||
|
/* protect the connection with the connection lock so that we can see when
|
||||||
|
* we are finished doing server communication */
|
||||||
|
res =
|
||||||
|
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
|
||||||
|
&message, src->ptcp_timeout);
|
||||||
|
|
||||||
|
GST_OBJECT_LOCK (src);
|
||||||
|
src->waiting = FALSE;
|
||||||
|
} else {
|
||||||
|
res = GST_RTSP_EINTR;
|
||||||
|
}
|
||||||
|
GST_OBJECT_UNLOCK (src);
|
||||||
|
|
||||||
switch (res) {
|
switch (res) {
|
||||||
case GST_RTSP_OK:
|
case GST_RTSP_OK:
|
||||||
|
@ -3562,7 +3565,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
|
||||||
case GST_RTSP_ETIMEOUT:
|
case GST_RTSP_ETIMEOUT:
|
||||||
/* no reply, send keep alive */
|
/* no reply, send keep alive */
|
||||||
GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
|
GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
|
||||||
gst_rtspsrc_send_keep_alive (src);
|
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
|
||||||
|
goto interrupt;
|
||||||
continue;
|
continue;
|
||||||
case GST_RTSP_EEOF:
|
case GST_RTSP_EEOF:
|
||||||
/* go EOS when the server closed the connection */
|
/* go EOS when the server closed the connection */
|
||||||
|
@ -3779,12 +3783,25 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
|
||||||
(gint) tv_timeout.tv_sec);
|
(gint) tv_timeout.tv_sec);
|
||||||
|
|
||||||
gst_rtsp_message_unset (&message);
|
gst_rtsp_message_unset (&message);
|
||||||
/* we should continue reading the TCP socket because the server might
|
|
||||||
* send us requests. When the session timeout expires, we need to send a
|
GST_OBJECT_LOCK (src);
|
||||||
* keep-alive request to keep the session open. */
|
if (src->loop_cmd == CMD_LOOP && !src->flushing) {
|
||||||
res =
|
src->waiting = TRUE;
|
||||||
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
|
GST_OBJECT_UNLOCK (src);
|
||||||
&message, &tv_timeout);
|
|
||||||
|
/* we should continue reading the TCP socket because the server might
|
||||||
|
* send us requests. When the session timeout expires, we need to send a
|
||||||
|
* keep-alive request to keep the session open. */
|
||||||
|
res =
|
||||||
|
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
|
||||||
|
&message, &tv_timeout);
|
||||||
|
|
||||||
|
GST_OBJECT_LOCK (src);
|
||||||
|
src->waiting = FALSE;
|
||||||
|
} else {
|
||||||
|
res = GST_RTSP_EINTR;
|
||||||
|
}
|
||||||
|
GST_OBJECT_UNLOCK (src);
|
||||||
|
|
||||||
switch (res) {
|
switch (res) {
|
||||||
case GST_RTSP_OK:
|
case GST_RTSP_OK:
|
||||||
|
@ -3797,7 +3814,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
|
||||||
case GST_RTSP_ETIMEOUT:
|
case GST_RTSP_ETIMEOUT:
|
||||||
/* send keep-alive, ignore the result, a warning will be posted. */
|
/* send keep-alive, ignore the result, a warning will be posted. */
|
||||||
GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
|
GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
|
||||||
gst_rtspsrc_send_keep_alive (src);
|
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
|
||||||
|
goto interrupt;
|
||||||
continue;
|
continue;
|
||||||
case GST_RTSP_EEOF:
|
case GST_RTSP_EEOF:
|
||||||
/* server closed the connection. not very fatal for UDP, reconnect and
|
/* server closed the connection. not very fatal for UDP, reconnect and
|
||||||
|
@ -3832,7 +3850,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
|
||||||
GST_DEBUG_OBJECT (src, "but is Unauthorized response ...");
|
GST_DEBUG_OBJECT (src, "but is Unauthorized response ...");
|
||||||
if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) {
|
if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) {
|
||||||
GST_DEBUG_OBJECT (src, "so retrying keep-alive");
|
GST_DEBUG_OBJECT (src, "so retrying keep-alive");
|
||||||
gst_rtspsrc_send_keep_alive (src);
|
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
|
||||||
|
goto interrupt;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
retry = 0;
|
retry = 0;
|
||||||
|
@ -3857,7 +3876,6 @@ interrupt:
|
||||||
if (src->loop_cmd != CMD_RECONNECT)
|
if (src->loop_cmd != CMD_RECONNECT)
|
||||||
goto stopping;
|
goto stopping;
|
||||||
|
|
||||||
|
|
||||||
/* when we get here we have to reconnect using tcp */
|
/* when we get here we have to reconnect using tcp */
|
||||||
src->loop_cmd = CMD_LOOP;
|
src->loop_cmd = CMD_LOOP;
|
||||||
|
|
||||||
|
@ -3974,12 +3992,14 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush)
|
||||||
{
|
{
|
||||||
GST_OBJECT_LOCK (src);
|
GST_OBJECT_LOCK (src);
|
||||||
src->loop_cmd = cmd;
|
src->loop_cmd = cmd;
|
||||||
|
src->flushing = flush;
|
||||||
if (flush) {
|
if (flush) {
|
||||||
GST_DEBUG_OBJECT (src, "start connection flush");
|
if (src->waiting) {
|
||||||
gst_rtspsrc_connection_flush (src, TRUE);
|
GST_DEBUG_OBJECT (src, "start connection flush");
|
||||||
|
gst_rtspsrc_connection_flush (src, TRUE);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
GST_DEBUG_OBJECT (src, "stop connection flush");
|
GST_DEBUG_OBJECT (src, "stop connection flush");
|
||||||
gst_rtspsrc_connection_flush (src, FALSE);
|
|
||||||
}
|
}
|
||||||
if (src->task)
|
if (src->task)
|
||||||
gst_task_start (src->task);
|
gst_task_start (src->task);
|
||||||
|
@ -5862,13 +5882,6 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async)
|
||||||
if (!src->conninfo.connection || !src->conninfo.connected)
|
if (!src->conninfo.connection || !src->conninfo.connected)
|
||||||
goto done;
|
goto done;
|
||||||
|
|
||||||
/* waiting for connection idle, we were flushing so any attempt at doing data
|
|
||||||
* transfer will result in pausing the tasks. */
|
|
||||||
GST_DEBUG_OBJECT (src, "wait for connection idle");
|
|
||||||
GST_RTSP_CONN_LOCK (src);
|
|
||||||
GST_DEBUG_OBJECT (src, "connection is idle now");
|
|
||||||
GST_RTSP_CONN_UNLOCK (src);
|
|
||||||
|
|
||||||
/* send some dummy packets before we activate the receive in the
|
/* send some dummy packets before we activate the receive in the
|
||||||
* udp sources */
|
* udp sources */
|
||||||
gst_rtspsrc_send_dummy_packets (src);
|
gst_rtspsrc_send_dummy_packets (src);
|
||||||
|
@ -6072,13 +6085,6 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async)
|
||||||
if (src->state == GST_RTSP_STATE_READY)
|
if (src->state == GST_RTSP_STATE_READY)
|
||||||
goto was_paused;
|
goto was_paused;
|
||||||
|
|
||||||
/* waiting for connection idle, we were flushing so any attempt at doing data
|
|
||||||
* transfer will result in pausing the tasks. */
|
|
||||||
GST_DEBUG_OBJECT (src, "wait for connection idle");
|
|
||||||
GST_RTSP_CONN_LOCK (src);
|
|
||||||
GST_DEBUG_OBJECT (src, "connection is idle now");
|
|
||||||
GST_RTSP_CONN_UNLOCK (src);
|
|
||||||
|
|
||||||
if (!src->conninfo.connection || !src->conninfo.connected)
|
if (!src->conninfo.connection || !src->conninfo.connected)
|
||||||
goto no_connection;
|
goto no_connection;
|
||||||
|
|
||||||
|
@ -6272,7 +6278,8 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
|
||||||
|
|
||||||
GST_OBJECT_LOCK (src);
|
GST_OBJECT_LOCK (src);
|
||||||
cmd = src->loop_cmd;
|
cmd = src->loop_cmd;
|
||||||
src->loop_cmd = CMD_WAIT;
|
if (cmd != CMD_LOOP || src->flushing)
|
||||||
|
src->loop_cmd = CMD_WAIT;
|
||||||
GST_DEBUG_OBJECT (src, "got command %d", cmd);
|
GST_DEBUG_OBJECT (src, "got command %d", cmd);
|
||||||
GST_OBJECT_UNLOCK (src);
|
GST_OBJECT_UNLOCK (src);
|
||||||
|
|
||||||
|
@ -6301,8 +6308,12 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
|
||||||
|
|
||||||
GST_OBJECT_LOCK (src);
|
GST_OBJECT_LOCK (src);
|
||||||
/* and go back to sleep */
|
/* and go back to sleep */
|
||||||
if (!running && src->loop_cmd == CMD_WAIT && src->task)
|
if (src->loop_cmd == CMD_WAIT) {
|
||||||
gst_task_pause (src->task);
|
if (running)
|
||||||
|
src->loop_cmd = CMD_LOOP;
|
||||||
|
else if (src->task)
|
||||||
|
gst_task_pause (src->task);
|
||||||
|
}
|
||||||
GST_OBJECT_UNLOCK (src);
|
GST_OBJECT_UNLOCK (src);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,10 +79,6 @@ typedef struct _GstRTSPSrcClass GstRTSPSrcClass;
|
||||||
#define GST_RTSP_STREAM_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
|
#define GST_RTSP_STREAM_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
|
||||||
#define GST_RTSP_STREAM_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
|
#define GST_RTSP_STREAM_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp)))
|
||||||
|
|
||||||
#define GST_RTSP_CONN_GET_LOCK(rtsp) (GST_RTSPSRC_CAST(rtsp)->conn_rec_lock)
|
|
||||||
#define GST_RTSP_CONN_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_CONN_GET_LOCK(rtsp)))
|
|
||||||
#define GST_RTSP_CONN_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_CONN_GET_LOCK(rtsp)))
|
|
||||||
|
|
||||||
typedef struct _GstRTSPConnInfo GstRTSPConnInfo;
|
typedef struct _GstRTSPConnInfo GstRTSPConnInfo;
|
||||||
|
|
||||||
struct _GstRTSPConnInfo {
|
struct _GstRTSPConnInfo {
|
||||||
|
@ -184,13 +180,12 @@ struct _GstRTSPSrc {
|
||||||
/* UDP mode loop */
|
/* UDP mode loop */
|
||||||
gint loop_cmd;
|
gint loop_cmd;
|
||||||
gboolean ignore_timeout;
|
gboolean ignore_timeout;
|
||||||
|
gboolean flushing;
|
||||||
|
gboolean waiting;
|
||||||
|
|
||||||
/* mutex for protecting state changes */
|
/* mutex for protecting state changes */
|
||||||
GStaticRecMutex *state_rec_lock;
|
GStaticRecMutex *state_rec_lock;
|
||||||
|
|
||||||
/* mutex for protecting the connection */
|
|
||||||
GStaticRecMutex *conn_rec_lock;
|
|
||||||
|
|
||||||
GstSDPMessage *sdp;
|
GstSDPMessage *sdp;
|
||||||
gboolean from_sdp;
|
gboolean from_sdp;
|
||||||
gint numstreams;
|
gint numstreams;
|
||||||
|
|
Loading…
Reference in a new issue