rtspsrc: handle data messages in separate method

Refactor and make a method to handle a data message.
This commit is contained in:
Wim Taymans 2013-06-26 14:27:34 +02:00
parent a4be0c6de3
commit 61219dc6ed

View file

@ -3761,98 +3761,19 @@ send_error:
} }
static GstFlowReturn static GstFlowReturn
gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) gst_rtspsrc_handle_data (GstRTSPSrc * src, GstRTSPMessage * message)
{ {
GstRTSPMessage message = { 0 }; GstFlowReturn ret = GST_FLOW_OK;
GstRTSPResult res;
gint channel; gint channel;
GstRTSPStream *stream; GstRTSPStream *stream;
GstPad *outpad = NULL; GstPad *outpad = NULL;
guint8 *data; guint8 *data;
guint size; guint size;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf; GstBuffer *buf;
gboolean is_rtcp, have_data; gboolean is_rtcp;
GstEvent *event; GstEvent *event;
/* here we are only interested in data messages */ channel = message->type_data.data.channel;
have_data = FALSE;
do {
GTimeVal tv_timeout;
/* get the next timeout interval */
gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
/* see if the timeout period expired */
if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) {
GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
/* send keep-alive, only act on interrupt, a warning will be posted for
* other errors. */
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
goto interrupt;
/* get new timeout */
gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
}
GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
tv_timeout.tv_sec, tv_timeout.tv_usec);
/* protect the connection with the connection lock so that we can see when
* we are finished doing server communication */
res =
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
&message, src->ptcp_timeout);
switch (res) {
case GST_RTSP_OK:
GST_DEBUG_OBJECT (src, "we received a server message");
break;
case GST_RTSP_EINTR:
/* we got interrupted this means we need to stop */
goto interrupt;
case GST_RTSP_ETIMEOUT:
/* no reply, send keep alive */
GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
goto interrupt;
continue;
case GST_RTSP_EEOF:
/* go EOS when the server closed the connection */
goto server_eof;
default:
goto receive_error;
}
switch (message.type) {
case GST_RTSP_MESSAGE_REQUEST:
/* server sends us a request message, handle it */
res =
gst_rtspsrc_handle_request (src, src->conninfo.connection,
&message);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
goto handle_request_failed;
break;
case GST_RTSP_MESSAGE_RESPONSE:
/* we ignore response messages */
GST_DEBUG_OBJECT (src, "ignoring response message");
if (src->debug)
gst_rtsp_message_dump (&message);
break;
case GST_RTSP_MESSAGE_DATA:
GST_DEBUG_OBJECT (src, "got data message");
have_data = TRUE;
break;
default:
GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
message.type);
break;
}
}
while (!have_data);
channel = message.type_data.data.channel;
stream = find_stream (src, &channel, (gpointer) find_stream_by_channel); stream = find_stream (src, &channel, (gpointer) find_stream_by_channel);
if (!stream) if (!stream)
@ -3869,7 +3790,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
} }
/* take a look at the body to figure out what we have */ /* take a look at the body to figure out what we have */
gst_rtsp_message_get_body (&message, &data, &size); gst_rtsp_message_get_body (message, &data, &size);
if (size < 2) if (size < 2)
goto invalid_length; goto invalid_length;
@ -3885,7 +3806,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
goto unknown_stream; goto unknown_stream;
/* take the message body for further processing */ /* take the message body for further processing */
gst_rtsp_message_steal_body (&message, &data, &size); gst_rtsp_message_steal_body (message, &data, &size);
/* strip the trailing \0 */ /* strip the trailing \0 */
size -= 1; size -= 1;
@ -3895,7 +3816,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
gst_memory_new_wrapped (0, data, size, 0, size, data, g_free)); gst_memory_new_wrapped (0, data, size, 0, size, data, g_free));
/* don't need message anymore */ /* don't need message anymore */
gst_rtsp_message_unset (&message); gst_rtsp_message_unset (message);
GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size, GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
channel); channel);
@ -3977,9 +3898,102 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
unknown_stream: unknown_stream:
{ {
GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel); GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
gst_rtsp_message_unset (&message); gst_rtsp_message_unset (message);
return GST_FLOW_OK; return GST_FLOW_OK;
} }
invalid_length:
{
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("Short message received, ignoring."));
gst_rtsp_message_unset (message);
return GST_FLOW_OK;
}
}
static GstFlowReturn
gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
{
GstRTSPMessage message = { 0 };
GstRTSPResult res;
GstFlowReturn ret = GST_FLOW_OK;
GTimeVal tv_timeout;
while (TRUE) {
/* get the next timeout interval */
gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
/* see if the timeout period expired */
if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) {
GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
/* send keep-alive, only act on interrupt, a warning will be posted for
* other errors. */
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
goto interrupt;
/* get new timeout */
gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout);
}
GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec",
tv_timeout.tv_sec, tv_timeout.tv_usec);
/* protect the connection with the connection lock so that we can see when
* we are finished doing server communication */
res =
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
&message, src->ptcp_timeout);
switch (res) {
case GST_RTSP_OK:
GST_DEBUG_OBJECT (src, "we received a server message");
break;
case GST_RTSP_EINTR:
/* we got interrupted this means we need to stop */
goto interrupt;
case GST_RTSP_ETIMEOUT:
/* no reply, send keep alive */
GST_DEBUG_OBJECT (src, "timeout, sending keep-alive");
if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR)
goto interrupt;
continue;
case GST_RTSP_EEOF:
/* go EOS when the server closed the connection */
goto server_eof;
default:
goto receive_error;
}
switch (message.type) {
case GST_RTSP_MESSAGE_REQUEST:
/* server sends us a request message, handle it */
res =
gst_rtspsrc_handle_request (src, src->conninfo.connection,
&message);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
goto handle_request_failed;
break;
case GST_RTSP_MESSAGE_RESPONSE:
/* we ignore response messages */
GST_DEBUG_OBJECT (src, "ignoring response message");
if (src->debug)
gst_rtsp_message_dump (&message);
break;
case GST_RTSP_MESSAGE_DATA:
GST_DEBUG_OBJECT (src, "got data message");
ret = gst_rtspsrc_handle_data (src, &message);
if (ret != GST_FLOW_OK)
goto handle_data_failed;
break;
default:
GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
message.type);
break;
}
}
g_assert_not_reached ();
/* ERRORS */
server_eof: server_eof:
{ {
GST_DEBUG_OBJECT (src, "we got an eof from the server"); GST_DEBUG_OBJECT (src, "we got an eof from the server");
@ -4017,12 +4031,10 @@ handle_request_failed:
gst_rtsp_message_unset (&message); gst_rtsp_message_unset (&message);
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
invalid_length: handle_data_failed:
{ {
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), GST_DEBUG_OBJECT (src, "could no handle data message");
("Short message received, ignoring.")); return ret;
gst_rtsp_message_unset (&message);
return GST_FLOW_OK;
} }
} }
@ -4121,6 +4133,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
break; break;
} }
} }
g_assert_not_reached ();
/* we get here when the connection got interrupted */ /* we get here when the connection got interrupted */
interrupt: interrupt: