rtspsrc: rework reconnect code

Use the same async code path to implement reconnects.
Make sure we only post progress messages when doing async things.
This commit is contained in:
Wim Taymans 2011-01-07 18:02:49 +01:00 committed by Mark Nauwelaerts
parent c27c10f8f4
commit 2513207433

View file

@ -3277,7 +3277,8 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source)
} }
static GstRTSPResult static GstRTSPResult
gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info) gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info,
gboolean async)
{ {
GstRTSPResult res; GstRTSPResult res;
@ -3312,6 +3313,7 @@ gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info)
if (!info->connected) { if (!info->connected) {
/* connect */ /* connect */
if (async)
GST_ELEMENT_PROGRESS (src, CONTINUE, "connect", GST_ELEMENT_PROGRESS (src, CONTINUE, "connect",
("Connecting to %s", info->location)); ("Connecting to %s", info->location));
GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location); GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location);
@ -3365,13 +3367,14 @@ gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info,
} }
static GstRTSPResult static GstRTSPResult
gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info) gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info,
gboolean async)
{ {
GstRTSPResult res; GstRTSPResult res;
GST_DEBUG_OBJECT (src, "reconnecting connection..."); GST_DEBUG_OBJECT (src, "reconnecting connection...");
gst_rtsp_conninfo_close (src, info, FALSE); gst_rtsp_conninfo_close (src, info, FALSE);
res = gst_rtsp_conninfo_connect (src, info); res = gst_rtsp_conninfo_connect (src, info, async);
return res; return res;
} }
@ -3768,7 +3771,6 @@ invalid_length:
static GstFlowReturn static GstFlowReturn
gst_rtspsrc_loop_udp (GstRTSPSrc * src) gst_rtspsrc_loop_udp (GstRTSPSrc * src)
{ {
gboolean restart = FALSE;
GstRTSPResult res; GstRTSPResult res;
GstRTSPMessage message = { 0 }; GstRTSPMessage message = { 0 };
gint retry = 0; gint retry = 0;
@ -3809,7 +3811,6 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
break; break;
case GST_RTSP_EINTR: case GST_RTSP_EINTR:
/* we got interrupted, see what we have to do */ /* we got interrupted, see what we have to do */
GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush");
goto interrupt; goto interrupt;
case GST_RTSP_ETIMEOUT: case GST_RTSP_ETIMEOUT:
/* send keep-alive, ignore the result, a warning will be posted. */ /* send keep-alive, ignore the result, a warning will be posted. */
@ -3822,7 +3823,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
* see what happens. */ * see what happens. */
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("The server closed the connection.")); ("The server closed the connection."));
if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) < 0) if ((res =
gst_rtsp_conninfo_reconnect (src, &src->conninfo, FALSE)) < 0)
goto connect_error; goto connect_error;
continue; continue;
@ -3868,17 +3870,79 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
} }
} }
interrupt:
/* we get here when the connection got interrupted */ /* we get here when the connection got interrupted */
GST_OBJECT_LOCK (src); interrupt:
{
gst_rtsp_message_unset (&message);
GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush");
gst_rtspsrc_connection_flush (src, FALSE); gst_rtspsrc_connection_flush (src, FALSE);
GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd); return GST_FLOW_WRONG_STATE;
if (src->loop_cmd != CMD_RECONNECT) }
goto stopping; connect_error:
{
gchar *str = gst_rtsp_strresult (res);
GstFlowReturn ret;
src->conninfo.connected = FALSE;
if (res != GST_RTSP_EINTR) {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
("Could not connect to server. (%s)", str));
g_free (str);
ret = GST_FLOW_ERROR;
} else {
ret = GST_FLOW_WRONG_STATE;
}
return ret;
}
receive_error:
{
gchar *str = gst_rtsp_strresult (res);
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not receive message. (%s)", str));
g_free (str);
return GST_FLOW_ERROR;
}
handle_request_failed:
{
gchar *str = gst_rtsp_strresult (res);
GstFlowReturn ret;
gst_rtsp_message_unset (&message);
if (res != GST_RTSP_EINTR) {
GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
("Could not handle server message. (%s)", str));
g_free (str);
ret = GST_FLOW_ERROR;
} else {
ret = GST_FLOW_WRONG_STATE;
}
return ret;
}
server_eof:
{
GST_DEBUG_OBJECT (src, "we got an eof from the server");
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("The server closed the connection."));
src->conninfo.connected = FALSE;
gst_rtsp_message_unset (&message);
return GST_FLOW_UNEXPECTED;
}
}
static GstRTSPResult
gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async)
{
GstRTSPResult res = GST_RTSP_OK;
gboolean restart;
GST_DEBUG_OBJECT (src, "doing reconnect");
GST_OBJECT_LOCK (src);
/* only restart when the pads were not yet activated, else we were /* only restart when the pads were not yet activated, else we were
* streaming over UDP */ * streaming over UDP */
restart = src->need_activate; restart = src->need_activate;
src->flushing = FALSE;
GST_OBJECT_UNLOCK (src); GST_OBJECT_UNLOCK (src);
/* no need to restart, we're done */ /* no need to restart, we're done */
@ -3889,10 +3953,12 @@ interrupt:
src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP; src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP;
/* pause to prepare for a restart */ /* pause to prepare for a restart */
gst_rtspsrc_pause (src, FALSE, FALSE); if ((res = gst_rtspsrc_pause (src, FALSE, async)) < 0)
goto done;
/* close and cleanup our state */ /* close and cleanup our state */
gst_rtspsrc_close (src, FALSE); if ((res = gst_rtspsrc_close (src, async)) < 0)
goto done;
/* see if we have TCP left to try. Also don't try TCP when we were configured /* see if we have TCP left to try. Also don't try TCP when we were configured
* with an SDP. */ * with an SDP. */
@ -3907,52 +3973,17 @@ interrupt:
gst_guint64_to_gdouble (src->udp_timeout / 1000000.0))); gst_guint64_to_gdouble (src->udp_timeout / 1000000.0)));
/* open new connection using tcp */ /* open new connection using tcp */
if (!gst_rtspsrc_open (src, FALSE)) if (gst_rtspsrc_open (src, async) < 0)
goto open_failed; goto open_failed;
/* start playback */ /* start playback */
if (!gst_rtspsrc_play (src, &src->segment, FALSE)) if (gst_rtspsrc_play (src, &src->segment, async) < 0)
goto play_failed; goto play_failed;
done: done:
return GST_FLOW_OK; return res;
/* ERRORS */ /* ERRORS */
stopping:
{
GST_DEBUG_OBJECT (src, "we are stopping");
GST_OBJECT_UNLOCK (src);
return GST_FLOW_WRONG_STATE;
}
receive_error:
{
gchar *str = gst_rtsp_strresult (res);
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("Could not receive message. (%s)", str));
g_free (str);
return GST_FLOW_ERROR;
}
handle_request_failed:
{
gchar *str = gst_rtsp_strresult (res);
GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
("Could not handle server message. (%s)", str));
g_free (str);
gst_rtsp_message_unset (&message);
return GST_FLOW_ERROR;
}
connect_error:
{
gchar *str = gst_rtsp_strresult (res);
src->conninfo.connected = FALSE;
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
("Could not connect to server. (%s)", str));
g_free (str);
return GST_FLOW_ERROR;
}
no_protocols: no_protocols:
{ {
src->cur_protocols = 0; src->cur_protocols = 0;
@ -3973,15 +4004,6 @@ play_failed:
GST_DEBUG_OBJECT (src, "play failed"); GST_DEBUG_OBJECT (src, "play failed");
return GST_FLOW_OK; return GST_FLOW_OK;
} }
server_eof:
{
GST_DEBUG_OBJECT (src, "we got an eof from the server");
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("The server closed the connection."));
src->conninfo.connected = FALSE;
gst_rtsp_message_unset (&message);
return GST_FLOW_UNEXPECTED;
}
} }
static void static void
@ -4535,7 +4557,9 @@ receive_error:
if (try == 0) { if (try == 0) {
try++; try++;
/* if reconnect succeeds, try again */ /* if reconnect succeeds, try again */
if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) == 0) if ((res =
gst_rtsp_conninfo_reconnect (src, &src->conninfo,
FALSE)) == 0)
goto again; goto again;
} }
/* only try once after reconnect, then fallthrough and error out */ /* only try once after reconnect, then fallthrough and error out */
@ -5068,7 +5092,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
} }
if (src->conninfo.connection == NULL) { if (src->conninfo.connection == NULL) {
if (!gst_rtsp_conninfo_connect (src, &stream->conninfo)) { if (!gst_rtsp_conninfo_connect (src, &stream->conninfo, async)) {
GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream); GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream);
continue; continue;
} }
@ -5469,7 +5493,7 @@ gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp,
src->conninfo.location = g_strdup (control); src->conninfo.location = g_strdup (control);
/* make a connection for this, if there was a connection already, nothing /* make a connection for this, if there was a connection already, nothing
* happens. */ * happens. */
if (gst_rtsp_conninfo_connect (src, &src->conninfo) < 0) { if (gst_rtsp_conninfo_connect (src, &src->conninfo, async) < 0) {
GST_ERROR_OBJECT (src, "could not connect"); GST_ERROR_OBJECT (src, "could not connect");
} }
} }
@ -5527,7 +5551,7 @@ restart:
goto no_url; goto no_url;
src->tried_url_auth = FALSE; src->tried_url_auth = FALSE;
if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo)) < 0) if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo, async)) < 0)
goto connect_failed; goto connect_failed;
/* create OPTIONS */ /* create OPTIONS */
@ -6381,6 +6405,11 @@ gst_rtspsrc_thread (GstRTSPSrc * src)
case CMD_LOOP: case CMD_LOOP:
running = gst_rtspsrc_loop (src); running = gst_rtspsrc_loop (src);
break; break;
case CMD_RECONNECT:
ret = gst_rtspsrc_reconnect (src, FALSE);
if (ret == GST_RTSP_OK)
running = TRUE;
break;
default: default:
break; break;
} }