gst/rtsp/gstrtpdec.c: Add pads after setting them up.

Original commit message from CVS:
* gst/rtsp/gstrtpdec.c: (gst_rtpdec_init), (gst_rtpdec_getcaps):
Add pads after setting them up.
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
(gst_rtspsrc_init), (gst_rtspsrc_finalize),
(gst_rtspsrc_free_stream), (gst_rtspsrc_media_to_caps),
(gst_rtspsrc_stream_setup_rtp),
(gst_rtspsrc_stream_configure_transport),
(gst_rtspsrc_combine_flows), (gst_rtspsrc_loop),
(gst_rtspsrc_open), (gst_rtspsrc_close), (gst_rtspsrc_play),
(gst_rtspsrc_pause):
* gst/rtsp/gstrtspsrc.h:
Fix interleaved mode.
- Protect streaming with lock.
- Combine flows
- set caps on outgoing buffers.
- strip trailing \0 from data packets.
- Configure RTP/RTCP in stream.
Use DEBUG_OBJECT more.
This commit is contained in:
Wim Taymans 2006-08-16 09:48:26 +00:00
parent 64faced49c
commit 6eedcfbc8c
4 changed files with 227 additions and 69 deletions

View file

@ -1,3 +1,25 @@
2006-08-16 Wim Taymans <wim@fluendo.com>
* gst/rtsp/gstrtpdec.c: (gst_rtpdec_init), (gst_rtpdec_getcaps):
Add pads after setting them up.
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
(gst_rtspsrc_init), (gst_rtspsrc_finalize),
(gst_rtspsrc_free_stream), (gst_rtspsrc_media_to_caps),
(gst_rtspsrc_stream_setup_rtp),
(gst_rtspsrc_stream_configure_transport),
(gst_rtspsrc_combine_flows), (gst_rtspsrc_loop),
(gst_rtspsrc_open), (gst_rtspsrc_close), (gst_rtspsrc_play),
(gst_rtspsrc_pause):
* gst/rtsp/gstrtspsrc.h:
Fix interleaved mode.
- Protect streaming with lock.
- Combine flows
- set caps on outgoing buffers.
- strip trailing \0 from data packets.
- Configure RTP/RTCP in stream.
Use DEBUG_OBJECT more.
2006-08-16 Wim Taymans <wim@fluendo.com>
* gst/udp/gstmultiudpsink.c: (gst_multiudpsink_add):

View file

@ -170,16 +170,16 @@ gst_rtpdec_init (GstRTPDec * rtpdec)
rtpdec->sink_rtp =
gst_pad_new_from_static_template (&gst_rtpdec_sink_rtp_template,
"sinkrtp");
gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtp);
gst_pad_set_getcaps_function (rtpdec->sink_rtp, gst_rtpdec_getcaps);
gst_pad_set_chain_function (rtpdec->sink_rtp, gst_rtpdec_chain_rtp);
gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtp);
/* the input rtcp pad */
rtpdec->sink_rtcp =
gst_pad_new_from_static_template (&gst_rtpdec_sink_rtcp_template,
"sinkrtcp");
gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtcp);
gst_pad_set_chain_function (rtpdec->sink_rtcp, gst_rtpdec_chain_rtcp);
gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtcp);
/* the output rtp pad */
rtpdec->src_rtp =
@ -203,7 +203,7 @@ gst_rtpdec_getcaps (GstPad * pad)
src = GST_RTPDEC (GST_PAD_PARENT (pad));
other = pad == src->src_rtp ? src->sink_rtp : src->src_rtp;
other = (pad == src->src_rtp ? src->sink_rtp : src->src_rtp);
caps = gst_pad_peer_get_caps (other);

View file

@ -135,6 +135,7 @@ gst_rtsp_proto_get_type (void)
static void gst_rtspsrc_base_init (gpointer g_class);
static void gst_rtspsrc_class_init (GstRTSPSrc * klass);
static void gst_rtspsrc_init (GstRTSPSrc * rtspsrc);
static void gst_rtspsrc_finalize (GObject * object);
static void gst_rtspsrc_uri_handler_init (gpointer g_iface,
gpointer iface_data);
@ -216,6 +217,8 @@ gst_rtspsrc_class_init (GstRTSPSrc * klass)
gobject_class->set_property = gst_rtspsrc_set_property;
gobject_class->get_property = gst_rtspsrc_get_property;
gobject_class->finalize = gst_rtspsrc_finalize;
g_object_class_install_property (gobject_class, PROP_LOCATION,
g_param_spec_string ("location", "RTSP Location",
"Location of the RTSP url to read",
@ -243,6 +246,21 @@ gst_rtspsrc_class_init (GstRTSPSrc * klass)
static void
gst_rtspsrc_init (GstRTSPSrc * src)
{
src->stream_rec_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (src->stream_rec_lock);
}
static void
gst_rtspsrc_finalize (GObject * object)
{
GstRTSPSrc *rtspsrc;
rtspsrc = GST_RTSPSRC (object);
g_static_rec_mutex_free (rtspsrc->stream_rec_lock);
g_free (rtspsrc->stream_rec_lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
@ -314,6 +332,22 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src)
return s;
}
#if 0
static void
gst_rtspsrc_free_stream (GstRTSPSrc * src, GstRTSPStream * stream)
{
if (stream->caps) {
gst_caps_unref (stream->caps);
stream->caps = NULL;
}
src->streams = g_list_remove (src->streams, stream);
src->numstreams--;
g_free (stream);
}
#endif
static gboolean
gst_rtspsrc_add_element (GstRTSPSrc * src, GstElement * element)
{
@ -437,7 +471,7 @@ gst_rtspsrc_parse_rtpmap (gchar * rtpmap, gint * payload, gchar ** name,
*
* m=<media> <udp port> RTP/AVP <payload>
* a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
* a=fmtp:<payload> <param>=<value>;...
* a=fmtp:<payload> <param>[=<value>];...
*/
static GstCaps *
gst_rtspsrc_media_to_caps (SDPMedia * media)
@ -502,34 +536,37 @@ gst_rtspsrc_media_to_caps (SDPMedia * media)
p = fmtp;
/* p is now of the format <payload> <param>[=<value>];... */
PARSE_INT (p, " ", payload);
if (payload != -1 && payload == pt) {
gchar **pairs;
gint i;
/* <param>[=<value>] are separated with ';' */
pairs = g_strsplit (p, ";", 0);
for (i = 0; pairs[i]; i++) {
gchar **keyval;
gchar *valpos;
gchar *val, *key;
keyval = g_strsplit (pairs[i], "=", 0);
if (keyval[0]) {
gchar *val, *key;
if (keyval[1])
val = g_strstrip (keyval[1]);
else
val = "1";
key = g_strstrip (keyval[0]);
gst_structure_set (s, key, G_TYPE_STRING, val, NULL);
/* the key may not have a '=', the value can have other '='s */
valpos = strstr (pairs[i], "=");
if (valpos) {
/* we have a '=' and thus a value, remove the '=' with \0 */
*valpos = '\0';
/* value is everything between '=' and ';' */
val = g_strstrip (valpos + 1);
} else {
/* simple <param>;.. is translated into <param>=1;... */
val = "1";
}
g_strfreev (keyval);
/* strip the key of spaces */
key = g_strstrip (pairs[i]);
gst_structure_set (s, key, G_TYPE_STRING, val, NULL);
}
g_strfreev (pairs);
}
}
return caps;
}
@ -630,32 +667,33 @@ again:
/* ERRORS */
no_udp_rtp_protocol:
{
GST_DEBUG ("could not get UDP source for RTP");
GST_DEBUG_OBJECT (src, "could not get UDP source for RTP");
goto cleanup;
}
start_rtp_failure:
{
GST_DEBUG ("could not start UDP source for RTP");
GST_DEBUG_OBJECT (src, "could not start UDP source for RTP");
goto cleanup;
}
no_ports:
{
GST_DEBUG ("could not allocate UDP port pair after %d retries", count);
GST_DEBUG_OBJECT (src, "could not allocate UDP port pair after %d retries",
count);
goto cleanup;
}
no_udp_rtcp_protocol:
{
GST_DEBUG ("could not get UDP source for RTCP");
GST_DEBUG_OBJECT (src, "could not get UDP source for RTCP");
goto cleanup;
}
start_rtcp_failure:
{
GST_DEBUG ("could not start UDP source for RTCP");
GST_DEBUG_OBJECT (src, "could not start UDP source for RTCP");
goto cleanup;
}
port_error:
{
GST_DEBUG ("ports don't match rtp: %d<->%d, rtcp: %d<->%d",
GST_DEBUG_OBJECT (src, "ports don't match rtp: %d<->%d, rtcp: %d<->%d",
tmp_rtp, *rtpport, tmp_rtcp, *rtcpport);
goto cleanup;
}
@ -679,7 +717,7 @@ cleanup:
static gboolean
gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
RTSPTransport * transport)
SDPMedia * media, RTSPTransport * transport)
{
GstRTSPSrc *src;
GstPad *pad;
@ -688,6 +726,8 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
src = stream->parent;
GST_DEBUG ("configuring RTP transport for stream %p", stream);
if (!(stream->rtpdec = gst_element_factory_make ("rtpdec", NULL)))
goto no_element;
@ -706,6 +746,13 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
/* configure for interleaved delivery, nothing needs to be done
* here, the loop function will call the chain functions of the
* rtp session manager. */
stream->rtpchannel = transport->interleaved.min;
stream->rtcpchannel = transport->interleaved.max;
GST_DEBUG ("stream %p on channels %d-%d", stream,
stream->rtpchannel, stream->rtcpchannel);
/* also store the caps in the stream */
stream->caps = gst_rtspsrc_media_to_caps (media);
} else {
/* configure for UDP delivery, we need to connect the udp pads to
* the rtp session plugin. */
@ -719,6 +766,10 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
}
pad = gst_element_get_pad (stream->rtpdec, "srcrtp");
if (stream->caps) {
gst_pad_use_fixed_caps (pad);
gst_pad_set_caps (pad, stream->caps);
}
name = g_strdup_printf ("rtp_stream%d", stream->id);
gst_element_add_pad (GST_ELEMENT_CAST (src), gst_ghost_pad_new (name, pad));
g_free (name);
@ -726,14 +777,15 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream,
return TRUE;
/* ERRORS */
no_element:
{
GST_DEBUG ("no rtpdec element found");
GST_DEBUG_OBJECT (src, "no rtpdec element found");
return FALSE;
}
start_rtpdec_failure:
{
GST_DEBUG ("could not start RTP session");
GST_DEBUG_OBJECT (src, "could not start RTP session");
return FALSE;
}
}
@ -749,6 +801,40 @@ find_stream (GstRTSPStream * stream, gconstpointer a)
return -1;
}
static GstFlowReturn
gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream,
GstFlowReturn ret)
{
GList *streams;
/* store the value */
stream->last_ret = ret;
/* if it's success we can return the value right away */
if (GST_FLOW_IS_SUCCESS (ret))
goto done;
/* any other error that is not-linked can be returned right
* away */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
/* only return NOT_LINKED if all other pads returned NOT_LINKED */
for (streams = src->streams; streams; streams = g_list_next (streams)) {
GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
ret = ostream->last_ret;
/* some other return value (must be SUCCESS but we can return
* other values as well) */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
}
/* if we get here, all other pads were unlinked and we return
* NOT_LINKED then */
done:
return ret;
}
static void
gst_rtspsrc_loop (GstRTSPSrc * src)
{
@ -760,12 +846,14 @@ gst_rtspsrc_loop (GstRTSPSrc * src)
GstPad *outpad = NULL;
guint8 *data;
guint size;
GstFlowReturn ret = GST_FLOW_OK;
GstCaps *caps = NULL;
do {
GST_DEBUG ("doing reveive");
GST_DEBUG_OBJECT (src, "doing receive");
if ((res = rtsp_connection_receive (src->connection, &response)) < 0)
goto receive_error;
GST_DEBUG ("got packet");
GST_DEBUG_OBJECT (src, "got packet type %d", response.type);
}
while (response.type != RTSP_MESSAGE_DATA);
@ -777,10 +865,12 @@ gst_rtspsrc_loop (GstRTSPSrc * src)
goto unknown_stream;
stream = (GstRTSPStream *) lstream->data;
if (channel == stream->rtpchannel)
if (channel == stream->rtpchannel) {
outpad = stream->rtpdecrtp;
else if (channel == stream->rtcpchannel)
caps = stream->caps;
} else if (channel == stream->rtcpchannel) {
outpad = stream->rtpdecrtcp;
}
rtsp_message_get_body (&response, &data, &size);
@ -798,21 +888,39 @@ gst_rtspsrc_loop (GstRTSPSrc * src)
{
GstBuffer *buf;
/* strip the trailing \0 */
size -= 1;
buf = gst_buffer_new_and_alloc (size);
memcpy (GST_BUFFER_DATA (buf), data, size);
if (gst_pad_chain (outpad, buf) != GST_FLOW_OK)
if (caps)
gst_buffer_set_caps (buf, caps);
GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
channel);
/* chain to the peer pad */
ret = gst_pad_chain (outpad, buf);
/* combine all streams */
ret = gst_rtspsrc_combine_flows (src, stream, ret);
if (ret != GST_FLOW_OK)
goto need_pause;
}
unknown_stream:
return;
/* ERRORS */
unknown_stream:
{
GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
return;
}
receive_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, WRITE,
("Could not receive message."), (NULL));
ret = GST_FLOW_UNEXPECTED;
/*
gst_pad_push_event (src->srcpad, gst_event_new (GST_EVENT_EOS));
*/
@ -820,6 +928,8 @@ receive_error:
}
need_pause:
{
GST_DEBUG_OBJECT (src, "pausing task, reason %d (%s)", ret,
gst_flow_get_name (ret));
gst_task_pause (src->task);
return;
}
@ -886,24 +996,24 @@ gst_rtspsrc_open (GstRTSPSrc * src)
GstRTSPProto protocols;
/* parse url */
GST_DEBUG ("parsing url...");
GST_DEBUG_OBJECT (src, "parsing url...");
if ((res = rtsp_url_parse (src->location, &url)) < 0)
goto invalid_url;
/* open connection */
GST_DEBUG ("opening connection...");
GST_DEBUG_OBJECT (src, "opening connection...");
if ((res = rtsp_connection_open (url, &src->connection)) < 0)
goto could_not_open;
/* create OPTIONS */
GST_DEBUG ("create options...");
GST_DEBUG_OBJECT (src, "create options...");
if ((res =
rtsp_message_init_request (RTSP_OPTIONS, src->location,
&request)) < 0)
goto create_request_failed;
/* send OPTIONS */
GST_DEBUG ("send options...");
GST_DEBUG_OBJECT (src, "send options...");
if (!gst_rtspsrc_send (src, &request, &response, NULL))
goto send_error;
@ -955,7 +1065,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
}
/* create DESCRIBE */
GST_DEBUG ("create describe...");
GST_DEBUG_OBJECT (src, "create describe...");
if ((res =
rtsp_message_init_request (RTSP_DESCRIBE, src->location,
&request)) < 0)
@ -964,7 +1074,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
rtsp_message_add_header (&request, RTSP_HDR_ACCEPT, "application/sdp");
/* send DESCRIBE */
GST_DEBUG ("send describe...");
GST_DEBUG_OBJECT (src, "send describe...");
if (!gst_rtspsrc_send (src, &request, &response, NULL))
goto send_error;
@ -984,7 +1094,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
/* parse SDP */
rtsp_message_get_body (&response, &data, &size);
GST_DEBUG ("parse sdp...");
GST_DEBUG_OBJECT (src, "parse sdp...");
sdp_message_init (&sdp);
sdp_message_parse_buffer (data, size, &sdp);
@ -1008,10 +1118,10 @@ gst_rtspsrc_open (GstRTSPSrc * src)
stream = gst_rtspsrc_create_stream (src);
GST_DEBUG ("setup media %d", i);
GST_DEBUG_OBJECT (src, "setup media %d", i);
control_url = sdp_media_get_attribute_val (media, "control");
if (control_url == NULL) {
GST_DEBUG ("no control url found, skipping stream");
GST_DEBUG_OBJECT (src, "no control url found, skipping stream");
continue;
}
@ -1023,7 +1133,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
setup_url = g_strdup_printf ("%s/%s", src->location, control_url);
}
GST_DEBUG ("setup %s", setup_url);
GST_DEBUG_OBJECT (src, "setup %s", setup_url);
/* create SETUP request */
if ((res =
rtsp_message_init_request (RTSP_SETUP, setup_url,
@ -1043,6 +1153,8 @@ gst_rtspsrc_open (GstRTSPSrc * src)
if (!gst_rtspsrc_stream_setup_rtp (stream, media, &rtpport, &rtcpport))
goto setup_rtp_failed;
GST_DEBUG_OBJECT (src, "setting up RTP ports %d-%d", rtpport, rtcpport);
trxparams = g_strdup_printf ("client_port=%d-%d", rtpport, rtcpport);
new = g_strconcat (transports, "RTP/AVP/UDP;unicast;", trxparams, NULL);
g_free (trxparams);
@ -1052,6 +1164,8 @@ gst_rtspsrc_open (GstRTSPSrc * src)
if (protocols & GST_RTSP_PROTO_UDP_MULTICAST) {
gchar *new;
GST_DEBUG_OBJECT (src, "setting up MULTICAST");
new =
g_strconcat (transports, transports[0] ? "," : "",
"RTP/AVP/UDP;multicast", NULL);
@ -1061,6 +1175,8 @@ gst_rtspsrc_open (GstRTSPSrc * src)
if (protocols & GST_RTSP_PROTO_TCP) {
gchar *new;
GST_DEBUG_OBJECT (src, "setting up TCP");
new =
g_strconcat (transports, transports[0] ? "," : "", "RTP/AVP/TCP",
NULL);
@ -1088,20 +1204,24 @@ gst_rtspsrc_open (GstRTSPSrc * src)
rtsp_transport_parse (resptrans, &transport);
/* update allowed transports for other streams */
if (transport.lower_transport == RTSP_LOWER_TRANS_TCP) {
GST_DEBUG_OBJECT (src, "stream %d as TCP", i);
protocols = GST_RTSP_PROTO_TCP;
src->interleaved = TRUE;
} else {
if (transport.multicast) {
/* disable unicast */
GST_DEBUG_OBJECT (src, "stream %d as MULTICAST", i);
protocols = GST_RTSP_PROTO_UDP_MULTICAST;
} else {
/* disable multicast */
GST_DEBUG_OBJECT (src, "stream %d as UNICAST", i);
protocols = GST_RTSP_PROTO_UDP_UNICAST;
}
}
/* now configure the stream with the transport */
if (!gst_rtspsrc_stream_configure_transport (stream, &transport)) {
GST_DEBUG ("could not configure stream transport, skipping stream");
if (!gst_rtspsrc_stream_configure_transport (stream, media, &transport)) {
GST_DEBUG_OBJECT (src,
"could not configure stream transport, skipping stream");
}
/* clean up our transport struct */
rtsp_transport_init (&transport);
@ -1173,11 +1293,20 @@ gst_rtspsrc_close (GstRTSPSrc * src)
RTSPMessage response = { 0 };
RTSPResult res;
GST_DEBUG ("TEARDOWN...");
GST_DEBUG_OBJECT (src, "TEARDOWN...");
/* stop task if any */
if (src->task) {
gst_task_stop (src->task);
/* make sure it is not running */
g_static_rec_mutex_lock (src->stream_rec_lock);
g_static_rec_mutex_unlock (src->stream_rec_lock);
/* no wait for the task to finish */
gst_task_join (src->task);
/* and free the task */
gst_object_unref (GST_OBJECT (src->task));
src->task = NULL;
}
@ -1194,7 +1323,7 @@ gst_rtspsrc_close (GstRTSPSrc * src)
}
/* close connection */
GST_DEBUG ("closing connection...");
GST_DEBUG_OBJECT (src, "closing connection...");
if ((res = rtsp_connection_close (src->connection)) < 0)
goto close_failed;
@ -1229,7 +1358,7 @@ gst_rtspsrc_play (GstRTSPSrc * src)
if (!(src->options & RTSP_PLAY))
return TRUE;
GST_DEBUG ("PLAY...");
GST_DEBUG_OBJECT (src, "PLAY...");
/* do play */
if ((res =
@ -1242,6 +1371,7 @@ gst_rtspsrc_play (GstRTSPSrc * src)
if (src->interleaved) {
src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_loop, src);
gst_task_set_lock (src->task, src->stream_rec_lock);
gst_task_start (src->task);
}
@ -1271,7 +1401,7 @@ gst_rtspsrc_pause (GstRTSPSrc * src)
if (!(src->options & RTSP_PAUSE))
return TRUE;
GST_DEBUG ("PAUSE...");
GST_DEBUG_OBJECT (src, "PAUSE...");
/* do pause */
if ((res =
rtsp_message_init_request (RTSP_PAUSE, src->location, &request)) < 0)

View file

@ -55,19 +55,23 @@ typedef struct _GstRTSPStream GstRTSPStream;
struct _GstRTSPStream {
gint id;
gint rtpchannel;
gint rtcpchannel;
GstRTSPSrc *parent;
/* our udp sources */
GstFlowReturn last_ret;
/* for interleaved mode */
gint rtpchannel;
gint rtcpchannel;
GstCaps *caps;
/* our udp sources for RTP */
GstElement *rtpsrc;
GstElement *rtcpsrc;
/* our udp sink back to the server */
GstElement *rtcpsink;
/* the rtp decoder */
/* the RTP decoder */
GstElement *rtpdec;
GstPad *rtpdecrtp;
GstPad *rtpdecrtcp;
@ -76,23 +80,25 @@ struct _GstRTSPStream {
struct _GstRTSPSrc {
GstElement element;
gboolean interleaved;
GstTask *task;
/* task and mutex for interleaved mode */
gboolean interleaved;
GstTask *task;
GStaticRecMutex *stream_rec_lock;
gint numstreams;
GList *streams;
gint numstreams;
GList *streams;
gchar *location;
gboolean debug;
guint retry;
gchar *location;
gboolean debug;
guint retry;
GstRTSPProto protocols;
GstRTSPProto protocols;
/* supported options */
gint options;
gint options;
RTSPConnection *connection;
RTSPMessage *request;
RTSPMessage *response;
RTSPConnection *connection;
RTSPMessage *request;
RTSPMessage *response;
};
struct _GstRTSPSrcClass {