rtspsrc: fix and improve async handling

Simplify the command handling; passing a command to thread means we really
want it to get the message, which means to always flush provided the command
can handle being interrupted.  Command thread indicates whether command
allows interruption and ensure non-flushing connection as it subsequently
needs it.

In particular, this also makes the TEARDOWN sequence interruptable
and also prevents races where _loop_ could miss a command and would
continue receiving (or at least trying to).

See #632504.
This commit is contained in:
Mark Nauwelaerts 2011-04-06 15:49:01 +02:00
parent e6798ad54c
commit f7ddf811d7
2 changed files with 23 additions and 45 deletions

View file

@ -3540,23 +3540,11 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
tv_timeout.tv_sec, tv_timeout.tv_usec);
GST_OBJECT_LOCK (src);
if (!src->flushing) {
src->waiting = TRUE;
GST_OBJECT_UNLOCK (src);
/* protect the connection with the connection lock so that we can see when
* we are finished doing server communication */
res =
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
&message, src->ptcp_timeout);
GST_OBJECT_LOCK (src);
src->waiting = FALSE;
} else {
res = GST_RTSP_EINTR;
}
GST_OBJECT_UNLOCK (src);
/* protect the connection with the connection lock so that we can see when
* we are finished doing server communication */
res =
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
&message, src->ptcp_timeout);
switch (res) {
case GST_RTSP_OK:
@ -3786,24 +3774,11 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
gst_rtsp_message_unset (&message);
GST_OBJECT_LOCK (src);
if (!src->flushing) {
src->waiting = TRUE;
GST_OBJECT_UNLOCK (src);
/* we should continue reading the TCP socket because the server might
* send us requests. When the session timeout expires, we need to send a
* keep-alive request to keep the session open. */
res =
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
&message, &tv_timeout);
GST_OBJECT_LOCK (src);
src->waiting = FALSE;
} else {
res = GST_RTSP_EINTR;
}
GST_OBJECT_UNLOCK (src);
/* we should continue reading the TCP socket because the server might
* send us requests. When the session timeout expires, we need to send a
* keep-alive request to keep the session open. */
res = gst_rtspsrc_connection_receive (src, src->conninfo.connection,
&message, &tv_timeout);
switch (res) {
case GST_RTSP_OK:
@ -3942,7 +3917,6 @@ gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async)
/* only restart when the pads were not yet activated, else we were
* streaming over UDP */
restart = src->need_activate;
src->flushing = FALSE;
GST_OBJECT_UNLOCK (src);
/* no need to restart, we're done */
@ -4102,6 +4076,8 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush)
{
gint old;
/* FIXME flush param mute; remove at discretion */
/* start new request */
gst_rtspsrc_loop_start_cmd (src, cmd);
@ -4115,14 +4091,10 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush)
GST_OBJECT_LOCK (src);
}
src->loop_cmd = cmd;
src->flushing = flush;
if (flush) {
if (src->waiting) {
GST_DEBUG_OBJECT (src, "start connection flush");
gst_rtspsrc_connection_flush (src, TRUE);
}
} else {
GST_DEBUG_OBJECT (src, "stop connection flush");
/* interrupt if allowed */
if (src->waiting) {
GST_DEBUG_OBJECT (src, "start connection flush");
gst_rtspsrc_connection_flush (src, TRUE);
}
if (src->task)
gst_task_start (src->task);
@ -6426,6 +6398,13 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
cmd = src->loop_cmd;
src->loop_cmd = CMD_WAIT;
GST_DEBUG_OBJECT (src, "got command %d", cmd);
/* we got the message command, so ensure communication is possible again */
gst_rtspsrc_connection_flush (src, FALSE);
/* we allow these to be interrupted */
if (cmd == CMD_LOOP || cmd == CMD_CLOSE)
src->waiting = TRUE;
GST_OBJECT_UNLOCK (src);
switch (cmd) {

View file

@ -180,7 +180,6 @@ struct _GstRTSPSrc {
/* UDP mode loop */
gint loop_cmd;
gboolean ignore_timeout;
gboolean flushing;
gboolean waiting;
gboolean open_error;