gstreamer/gst/sdp/gstsdpdemux.c
Wim Taymans e9eae335f1 sdpdemux: rework RTCP sending and RTP receiving
When we are dealing with multiast, create the udp src and sink elements pointing
to the multicast addresses. When we are doing unicast, receive data on the local
ports and don't send RTCP because we don't know where we have to send it.

Fixes #583188
2009-05-21 22:22:06 +02:00

1410 lines
38 KiB
C

/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
/**
* SECTION:element-sdpdemux
*
* sdpdemux currently understands SDP as the input format of the session description.
* For each stream listed in the SDP a new rtp_stream%d pad will be created
* with caps derived from the SDP media description. This is a caps of mime type
* "application/x-rtp" that can be connected to any available RTP depayloader
* element.
*
* sdpdemux will internally instantiate an RTP session manager element
* that will handle the RTCP messages to and from the server, jitter removal,
* packet reordering along with providing a clock for the pipeline.
*
* sdpdemux acts like a live element and will therefore only generate data in the
* PLAYING state.
*
* <refsect2>
* <title>Example launch line</title>
* |[
* gst-launch gnomevfssrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
* ]| Establish a connection to an HTTP server that contains an SDP session description
* that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
* </refsect2>
*
* Last reviewed on 2007-10-01 (0.10.6)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef G_OS_WIN32
#ifdef _MSC_VER
#include <Winsock2.h>
#endif
/* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
* * minwg32 headers check WINVER before allowing the use of these */
#ifndef WINVER
#define WINVER 0x0501
#endif
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#endif
#include <stdlib.h>
#include <string.h>
#include <locale.h>
#include <stdio.h>
#include <stdarg.h>
#include <gst/rtp/gstrtppayloads.h>
#include <gst/sdp/gstsdpmessage.h>
#include "gstsdpdemux.h"
GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
#define GST_CAT_DEFAULT (sdpdemux_debug)
/* elementfactory information */
static const GstElementDetails gst_sdp_demux_details =
GST_ELEMENT_DETAILS ("SDP session setup",
"Codec/Demuxer/Network/RTP",
"Receive data over the network via SDP",
"Wim Taymans <wim.taymans@gmail.com>");
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/sdp"));
static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream%d",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS ("application/x-rtp"));
enum
{
/* FILL ME */
LAST_SIGNAL
};
#define DEFAULT_DEBUG FALSE
#define DEFAULT_TIMEOUT 10000000
#define DEFAULT_LATENCY_MS 200
enum
{
PROP_0,
PROP_DEBUG,
PROP_TIMEOUT,
PROP_LATENCY
};
static void gst_sdp_demux_base_init (gpointer g_class);
static void gst_sdp_demux_finalize (GObject * object);
static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstCaps *gst_sdp_demux_media_to_caps (gint pt,
const GstSDPMedia * media);
static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
GstStateChange transition);
static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
GstSDPStream * stream, GstEvent * event);
static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event);
static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad,
GstBuffer * buffer);
/*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
static void
_do_init (GType sdp_demux_type)
{
GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
}
GST_BOILERPLATE_FULL (GstSDPDemux, gst_sdp_demux, GstBin, GST_TYPE_BIN,
_do_init);
static void
gst_sdp_demux_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtptemplate));
gst_element_class_set_details (element_class, &gst_sdp_demux_details);
}
static void
gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBinClass *gstbin_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbin_class = (GstBinClass *) klass;
gobject_class->set_property = gst_sdp_demux_set_property;
gobject_class->get_property = gst_sdp_demux_get_property;
gobject_class->finalize = gst_sdp_demux_finalize;
g_object_class_install_property (gobject_class, PROP_DEBUG,
g_param_spec_boolean ("debug", "Debug",
"Dump request and response messages to stdout",
DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (gobject_class, PROP_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
"Fail transport after UDP timeout microseconds (0 = disabled)",
0, G_MAXUINT64, DEFAULT_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (gobject_class, PROP_LATENCY,
g_param_spec_uint ("latency", "Buffer latency in ms",
"Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
gstelement_class->change_state = gst_sdp_demux_change_state;
gstbin_class->handle_message = gst_sdp_demux_handle_message;
}
static void
gst_sdp_demux_init (GstSDPDemux * demux, GstSDPDemuxClass * g_class)
{
demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
gst_pad_set_event_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
gst_pad_set_chain_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
/* protects the streaming thread in interleaved mode or the polling
* thread in UDP mode. */
demux->stream_rec_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (demux->stream_rec_lock);
demux->adapter = gst_adapter_new ();
}
static void
gst_sdp_demux_finalize (GObject * object)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (object);
/* free locks */
g_static_rec_mutex_free (demux->stream_rec_lock);
g_free (demux->stream_rec_lock);
g_object_unref (demux->adapter);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_sdp_demux_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (object);
switch (prop_id) {
case PROP_DEBUG:
demux->debug = g_value_get_boolean (value);
break;
case PROP_TIMEOUT:
demux->udp_timeout = g_value_get_uint64 (value);
break;
case PROP_LATENCY:
demux->latency = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (object);
switch (prop_id) {
case PROP_DEBUG:
g_value_set_boolean (value, demux->debug);
break;
case PROP_TIMEOUT:
g_value_set_uint64 (value, demux->udp_timeout);
break;
case PROP_LATENCY:
g_value_set_uint (value, demux->latency);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gint
find_stream_by_id (GstSDPStream * stream, gconstpointer a)
{
gint id = GPOINTER_TO_INT (a);
if (stream->id == id)
return 0;
return -1;
}
static gint
find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
{
gint pt = GPOINTER_TO_INT (a);
if (stream->pt == pt)
return 0;
return -1;
}
static gint
find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
{
GstElement *src = (GstElement *) a;
if (stream->udpsrc[0] == src)
return 0;
if (stream->udpsrc[1] == src)
return 0;
return -1;
}
GstSDPStream *
find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
{
GList *lstream;
/* find and get stream */
if ((lstream =
g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
return (GstSDPStream *) lstream->data;
return NULL;
}
static void
gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
{
gint i;
GST_DEBUG_OBJECT (demux, "free stream %p", stream);
if (stream->caps)
gst_caps_unref (stream->caps);
for (i = 0; i < 2; i++) {
GstElement *udpsrc = stream->udpsrc[i];
if (udpsrc) {
gst_element_set_state (udpsrc, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
stream->udpsrc[i] = NULL;
}
}
if (stream->udpsink) {
gst_element_set_state (stream->udpsink, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
stream->udpsink = NULL;
}
if (stream->srcpad) {
gst_pad_set_active (stream->srcpad, FALSE);
if (stream->added) {
gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
stream->added = FALSE;
}
stream->srcpad = NULL;
}
g_free (stream);
}
static gboolean
is_multicast_address (const gchar * host_name)
{
struct addrinfo hints;
struct addrinfo *ai;
struct addrinfo *res;
gboolean ret = FALSE;
int err;
memset (&hints, 0, sizeof (hints));
hints.ai_socktype = SOCK_DGRAM;
g_return_val_if_fail (host_name, FALSE);
if ((err = getaddrinfo (host_name, NULL, &hints, &res)) < 0)
return FALSE;
for (ai = res; !ret && ai; ai = ai->ai_next) {
if (ai->ai_family == AF_INET)
ret =
IN_MULTICAST (ntohl (((struct sockaddr_in *) ai->ai_addr)->
sin_addr.s_addr));
else
ret =
IN6_IS_ADDR_MULTICAST (&((struct sockaddr_in6 *) ai->
ai_addr)->sin6_addr);
}
freeaddrinfo (res);
return ret;
}
static GstSDPStream *
gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
{
GstSDPStream *stream;
const gchar *payload, *rtcp;
const GstSDPMedia *media;
const GstSDPConnection *conn;
/* get media, should not return NULL */
media = gst_sdp_message_get_media (sdp, idx);
if (media == NULL)
return NULL;
stream = g_new0 (GstSDPStream, 1);
stream->parent = demux;
/* we mark the pad as not linked, we will mark it as OK when we add the pad to
* the element. */
stream->last_ret = GST_FLOW_OK;
stream->added = FALSE;
stream->disabled = FALSE;
stream->id = demux->numstreams++;
stream->eos = FALSE;
/* we must have a payload. No payload means we cannot create caps */
/* FIXME, handle multiple formats. */
if ((payload = gst_sdp_media_get_format (media, 0))) {
stream->pt = atoi (payload);
/* convert caps */
stream->caps = gst_sdp_demux_media_to_caps (stream->pt, media);
if (stream->pt >= 96) {
/* If we have a dynamic payload type, see if we have a stream with the
* same payload number. If there is one, they are part of the same
* container and we only need to add one pad. */
if (find_stream (demux, GINT_TO_POINTER (stream->pt),
(gpointer) find_stream_by_pt)) {
stream->container = TRUE;
}
}
}
if (!(conn = gst_sdp_media_get_connection (media, 0))) {
if (!(conn = gst_sdp_message_get_connection (sdp)))
goto no_connection;
}
stream->destination = conn->address;
stream->ttl = conn->ttl;
stream->multicast = is_multicast_address (stream->destination);
stream->rtp_port = gst_sdp_media_get_port (media);
if ((rtcp = gst_sdp_media_get_attribute_val (media, "rtcp"))) {
/* FIXME, RFC 3605 */
stream->rtcp_port = stream->rtp_port + 1;
} else {
stream->rtcp_port = stream->rtp_port + 1;
}
GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
/* we keep track of all streams */
demux->streams = g_list_append (demux->streams, stream);
return stream;
/* ERRORS */
no_connection:
{
gst_sdp_demux_stream_free (demux, stream);
return NULL;
}
}
static void
gst_sdp_demux_cleanup (GstSDPDemux * demux)
{
GList *walk;
GST_DEBUG_OBJECT (demux, "cleanup");
for (walk = demux->streams; walk; walk = g_list_next (walk)) {
GstSDPStream *stream = (GstSDPStream *) walk->data;
gst_sdp_demux_stream_free (demux, stream);
}
g_list_free (demux->streams);
demux->streams = NULL;
if (demux->session) {
if (demux->session_sig_id) {
g_signal_handler_disconnect (demux->session, demux->session_sig_id);
demux->session_sig_id = 0;
}
gst_element_set_state (demux->session, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), demux->session);
demux->session = NULL;
}
demux->numstreams = 0;
}
#define PARSE_INT(p, del, res) \
G_STMT_START { \
gchar *t = p; \
p = strstr (p, del); \
if (p == NULL) \
res = -1; \
else { \
*p = '\0'; \
p++; \
res = atoi (t); \
} \
} G_STMT_END
#define PARSE_STRING(p, del, res) \
G_STMT_START { \
gchar *t = p; \
p = strstr (p, del); \
if (p == NULL) { \
res = NULL; \
p = t; \
} \
else { \
*p = '\0'; \
p++; \
res = t; \
} \
} G_STMT_END
#define SKIP_SPACES(p) \
while (*p && g_ascii_isspace (*p)) \
p++;
/* rtpmap contains:
*
* <payload> <encoding_name>/<clock_rate>[/<encoding_params>]
*/
static gboolean
gst_sdp_demux_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name,
gint * rate, gchar ** params)
{
gchar *p, *t;
t = p = (gchar *) rtpmap;
PARSE_INT (p, " ", *payload);
if (*payload == -1)
return FALSE;
SKIP_SPACES (p);
if (*p == '\0')
return FALSE;
PARSE_STRING (p, "/", *name);
if (*name == NULL) {
GST_DEBUG ("no rate, name %s", p);
/* no rate, assume -1 then */
*name = p;
*rate = -1;
return TRUE;
}
t = p;
p = strstr (p, "/");
if (p == NULL) {
*rate = atoi (t);
return TRUE;
}
*p = '\0';
p++;
*rate = atoi (t);
t = p;
if (*p == '\0')
return TRUE;
*params = t;
return TRUE;
}
/*
* Mapping of caps to and from SDP fields:
*
* m=<media> <UDP port> RTP/AVP <payload>
* a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
* a=fmtp:<payload> <param>[=<value>];...
*/
static GstCaps *
gst_sdp_demux_media_to_caps (gint pt, const GstSDPMedia * media)
{
GstCaps *caps;
const gchar *rtpmap;
const gchar *fmtp;
gchar *name = NULL;
gint rate = -1;
gchar *params = NULL;
gchar *tmp;
GstStructure *s;
gint payload = 0;
gboolean ret;
/* get and parse rtpmap */
if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) {
ret = gst_sdp_demux_parse_rtpmap (rtpmap, &payload, &name, &rate, &params);
if (ret) {
if (payload != pt) {
/* we ignore the rtpmap if the payload type is different. */
g_warning ("rtpmap of wrong payload type, ignoring");
name = NULL;
rate = -1;
params = NULL;
}
} else {
/* if we failed to parse the rtpmap for a dynamic payload type, we have an
* error */
if (pt >= 96)
goto no_rtpmap;
/* else we can ignore */
g_warning ("error parsing rtpmap, ignoring");
}
} else {
/* dynamic payloads need rtpmap or we fail */
if (pt >= 96)
goto no_rtpmap;
}
/* check if we have a rate, if not, we need to look up the rate from the
* default rates based on the payload types. */
if (rate == -1) {
const GstRTPPayloadInfo *info;
if (GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
/* dynamic types, use media and encoding_name */
tmp = g_ascii_strdown (media->media, -1);
info = gst_rtp_payload_info_for_name (tmp, name);
g_free (tmp);
} else {
/* static types, use payload type */
info = gst_rtp_payload_info_for_pt (pt);
}
if (info) {
if ((rate = info->clock_rate) == 0)
rate = -1;
}
/* we fail if we cannot find one */
if (rate == -1)
goto no_rate;
}
tmp = g_ascii_strdown (media->media, -1);
caps = gst_caps_new_simple ("application/x-rtp",
"media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL);
g_free (tmp);
s = gst_caps_get_structure (caps, 0);
gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL);
/* encoding name must be upper case */
if (name != NULL) {
tmp = g_ascii_strup (name, -1);
gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL);
g_free (tmp);
}
/* params must be lower case */
if (params != NULL) {
tmp = g_ascii_strdown (params, -1);
gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL);
g_free (tmp);
}
/* parse optional fmtp: field */
if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) {
gchar *p;
gint payload = 0;
p = (gchar *) fmtp;
/* p is now of the format <payload> <param>[=<value>];... */
PARSE_INT (p, " ", payload);
if (payload != -1 && payload == pt) {
gchar **pairs;
gint i;
/* <param>[=<value>] are separated with ';' */
pairs = g_strsplit (p, ";", 0);
for (i = 0; pairs[i]; i++) {
gchar *valpos;
gchar *val, *key;
/* the key may not have a '=', the value can have other '='s */
valpos = strstr (pairs[i], "=");
if (valpos) {
/* we have a '=' and thus a value, remove the '=' with \0 */
*valpos = '\0';
/* value is everything between '=' and ';'. FIXME, strip? */
val = g_strstrip (valpos + 1);
} else {
/* simple <param>;.. is translated into <param>=1;... */
val = "1";
}
/* strip the key of spaces, convert key to lowercase but not the value. */
key = g_strstrip (pairs[i]);
if (strlen (key) > 1) {
tmp = g_ascii_strdown (key, -1);
gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL);
g_free (tmp);
}
}
g_strfreev (pairs);
}
}
return caps;
/* ERRORS */
no_rtpmap:
{
g_warning ("rtpmap type not given for dynamic payload %d", pt);
return NULL;
}
no_rate:
{
g_warning ("rate unknown for payload type %d", pt);
return NULL;
}
}
/* this callback is called when the session manager generated a new src pad with
* payloaded RTP packets. We simply ghost the pad here. */
static void
new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
{
gchar *name;
GstPadTemplate *template;
gint id, ssrc, pt;
GList *lstream;
GstSDPStream *stream;
gboolean all_added;
GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
GST_SDP_STREAM_LOCK (demux);
/* find stream */
name = gst_object_get_name (GST_OBJECT_CAST (pad));
if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3)
goto unknown_stream;
GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %d, PT %d", id, ssrc, pt);
stream =
find_stream (demux, GINT_TO_POINTER (id), (gpointer) find_stream_by_id);
if (stream == NULL)
goto unknown_stream;
/* no need for a timeout anymore now */
g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
/* create a new pad we will use to stream to */
template = gst_static_pad_template_get (&rtptemplate);
stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
gst_object_unref (template);
g_free (name);
stream->added = TRUE;
gst_pad_set_active (stream->srcpad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
/* check if we added all streams */
all_added = TRUE;
for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
stream = (GstSDPStream *) lstream->data;
/* a container stream only needs one pad added. Also disabled streams don't
* count */
if (!stream->container && !stream->disabled && !stream->added) {
all_added = FALSE;
break;
}
}
GST_SDP_STREAM_UNLOCK (demux);
if (all_added) {
GST_DEBUG_OBJECT (demux, "We added all streams");
/* when we get here, all stream are added and we can fire the no-more-pads
* signal. */
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
}
return;
/* ERRORS */
unknown_stream:
{
GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
GST_SDP_STREAM_UNLOCK (demux);
g_free (name);
return;
}
}
static GstCaps *
request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
{
GstSDPStream *stream;
GstCaps *caps;
GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
session);
GST_SDP_STREAM_LOCK (demux);
stream =
find_stream (demux, GINT_TO_POINTER (session),
(gpointer) find_stream_by_id);
if (!stream)
goto unknown_stream;
caps = stream->caps;
if (caps)
gst_caps_ref (caps);
GST_SDP_STREAM_UNLOCK (demux);
return caps;
unknown_stream:
{
GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
GST_SDP_STREAM_UNLOCK (demux);
return NULL;
}
}
static void
gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session)
{
GstSDPStream *stream;
GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
/* get stream for session */
stream =
find_stream (demux, GINT_TO_POINTER (session),
(gpointer) find_stream_by_id);
if (!stream)
goto unknown_stream;
if (stream->eos)
goto was_eos;
stream->eos = TRUE;
gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
return;
/* ERRORS */
unknown_stream:
{
GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
return;
}
was_eos:
{
GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
return;
}
}
static void
on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
GstSDPDemux * demux)
{
GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
session);
gst_sdp_demux_do_stream_eos (demux, session);
}
static void
on_timeout (GstElement * manager, guint session, guint32 ssrc,
GstSDPDemux * demux)
{
GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
gst_sdp_demux_do_stream_eos (demux, session);
}
/* try to get and configure a manager */
static gboolean
gst_sdp_demux_configure_manager (GstSDPDemux * demux)
{
GstStateChangeReturn ret;
/* configure the session manager */
if (!(demux->session = gst_element_factory_make ("gstrtpbin", NULL)))
goto manager_failed;
/* we manage this element */
gst_bin_add (GST_BIN_CAST (demux), demux->session);
ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
if (ret == GST_STATE_CHANGE_FAILURE)
goto start_session_failure;
g_object_set (demux->session, "latency", demux->latency, NULL);
/* connect to signals if we did not already do so */
GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
demux->session_sig_id =
g_signal_connect (demux->session, "pad-added",
(GCallback) new_session_pad, demux);
demux->session_ptmap_id =
g_signal_connect (demux->session, "request-pt-map",
(GCallback) request_pt_map, demux);
g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
demux);
g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
demux);
g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
demux);
return TRUE;
/* ERRORS */
manager_failed:
{
GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
return FALSE;
}
start_session_failure:
{
GST_DEBUG_OBJECT (demux, "could not start session");
gst_element_set_state (demux->session, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), demux->session);
demux->session = NULL;
return FALSE;
}
}
static gboolean
gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
{
gchar *uri, *name, *destination;
GstPad *pad;
GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
/* if the destination is not a multicast address, we just want to listen on
* our local ports */
if (!stream->multicast)
destination = "0.0.0.0";
else
destination = stream->destination;
/* creating UDP source */
if (stream->rtp_port != -1) {
GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
stream->rtp_port);
uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
stream->udpsrc[0] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
g_free (uri);
if (stream->udpsrc[0] == NULL)
goto no_element;
/* take ownership */
gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
GST_DEBUG_OBJECT (demux,
"setting up UDP source with timeout %" G_GINT64_FORMAT,
demux->udp_timeout);
/* configure a timeout on the UDP port. When the timeout message is
* posted, we assume UDP transport is not possible. */
g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", demux->udp_timeout,
NULL);
/* get output pad of the UDP source. */
pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
name = g_strdup_printf ("recv_rtp_sink_%d", stream->id);
stream->channelpad[0] = gst_element_get_request_pad (demux->session, name);
g_free (name);
GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
/* configure for UDP delivery, we need to connect the UDP pads to
* the session plugin. */
gst_pad_link (pad, stream->channelpad[0]);
gst_object_unref (pad);
/* change state */
gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
}
/* creating another UDP source */
if (stream->rtcp_port != -1) {
GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
stream->rtcp_port);
uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
stream->udpsrc[1] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
g_free (uri);
if (stream->udpsrc[1] == NULL)
goto no_element;
/* take ownership */
gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id);
stream->channelpad[1] = gst_element_get_request_pad (demux->session, name);
g_free (name);
pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
gst_pad_link (pad, stream->channelpad[1]);
gst_object_unref (pad);
gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
}
return TRUE;
/* ERRORS */
no_element:
{
GST_DEBUG_OBJECT (demux, "no UDP source element found");
return FALSE;
}
}
/* configure the UDP sink back to the server for status reports */
static gboolean
gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
GstSDPStream * stream)
{
GstPad *pad, *sinkpad;
gint port, sockfd = -1;
gchar *destination, *uri, *name;
/* get destination and port */
port = stream->rtcp_port;
destination = stream->destination;
GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
uri = g_strdup_printf ("udp://%s:%d", destination, port);
stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL);
g_free (uri);
if (stream->udpsink == NULL)
goto no_sink_element;
/* we clear all destinations because we don't really know where to send the
* RTCP to and we want to avoid sending it to our own ports.
* FIXME when we get an RTCP packet from the sender, we could look at its
* source port and address and try to send RTCP there. */
if (!stream->multicast)
g_signal_emit_by_name (stream->udpsink, "clear");
/* no sync needed */
g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
/* no async state changes needed */
g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
if (stream->udpsrc[1]) {
/* configure socket, we give it the same UDP socket as the udpsrc for RTCP
* because some servers check the port number of where it sends RTCP to identify
* the RTCP packets it receives */
g_object_get (G_OBJECT (stream->udpsrc[1]), "sock", &sockfd, NULL);
GST_DEBUG_OBJECT (demux, "UDP src has sock %d", sockfd);
/* configure socket and make sure udpsink does not close it when shutting
* down, it belongs to udpsrc after all. */
g_object_set (G_OBJECT (stream->udpsink), "sockfd", sockfd, NULL);
g_object_set (G_OBJECT (stream->udpsink), "closefd", FALSE, NULL);
}
/* we keep this playing always */
gst_element_set_locked_state (stream->udpsink, TRUE);
gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
/* get session RTCP pad */
name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
pad = gst_element_get_request_pad (demux->session, name);
g_free (name);
/* and link */
if (pad) {
sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
gst_pad_link (pad, sinkpad);
gst_object_unref (sinkpad);
} else {
/* not very fatal, we just won't be able to send RTCP */
GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
}
return TRUE;
/* ERRORS */
no_sink_element:
{
GST_DEBUG_OBJECT (demux, "no UDP sink element found");
return FALSE;
}
}
static GstFlowReturn
gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
GstFlowReturn ret)
{
GList *streams;
/* store the value */
stream->last_ret = ret;
/* if it's success we can return the value right away */
if (GST_FLOW_IS_SUCCESS (ret))
goto done;
/* any other error that is not-linked can be returned right
* away */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
/* only return NOT_LINKED if all other pads returned NOT_LINKED */
for (streams = demux->streams; streams; streams = g_list_next (streams)) {
GstSDPStream *ostream = (GstSDPStream *) streams->data;
ret = ostream->last_ret;
/* some other return value (must be SUCCESS but we can return
* other values as well) */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
}
/* if we get here, all other pads were unlinked and we return
* NOT_LINKED then */
done:
return ret;
}
static void
gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
GstEvent * event)
{
/* only streams that have a connection to the outside world */
if (stream->srcpad == NULL)
goto done;
if (stream->channelpad[0]) {
gst_event_ref (event);
gst_pad_send_event (stream->channelpad[0], event);
}
if (stream->channelpad[1]) {
gst_event_ref (event);
gst_pad_send_event (stream->channelpad[1], event);
}
done:
gst_event_unref (event);
}
static void
gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ELEMENT:
{
const GstStructure *s = gst_message_get_structure (message);
if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
gboolean ignore_timeout;
GST_DEBUG_OBJECT (bin, "timeout on UDP port");
GST_OBJECT_LOCK (demux);
ignore_timeout = demux->ignore_timeout;
demux->ignore_timeout = TRUE;
GST_OBJECT_UNLOCK (demux);
/* we only act on the first udp timeout message, others are irrelevant
* and can be ignored. */
if (ignore_timeout)
gst_message_unref (message);
else {
GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
("Could not receive any UDP packets for %.4f seconds, maybe your "
"firewall is blocking it.",
gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
}
return;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
case GST_MESSAGE_ERROR:
{
GstObject *udpsrc;
GstSDPStream *stream;
GstFlowReturn ret;
udpsrc = GST_MESSAGE_SRC (message);
GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
/* fatal but not our message, forward */
if (!stream)
goto forward;
/* we ignore the RTCP udpsrc */
if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
goto done;
/* if we get error messages from the udp sources, that's not a problem as
* long as not all of them error out. We also don't really know what the
* problem is, the message does not give enough detail... */
ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
if (ret != GST_FLOW_OK)
goto forward;
done:
gst_message_unref (message);
break;
forward:
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
default:
{
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
}
}
static gboolean
gst_sdp_demux_start (GstSDPDemux * demux)
{
guint8 *data;
guint size;
gint i, n_streams;
GstSDPMessage sdp = { 0 };
GstSDPStream *stream = NULL;
GList *walk;
/* grab the lock so that no state change can interfere */
GST_SDP_STREAM_LOCK (demux);
GST_DEBUG_OBJECT (demux, "parse SDP...");
size = gst_adapter_available (demux->adapter);
data = gst_adapter_take (demux->adapter, size);
gst_sdp_message_init (&sdp);
if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
goto could_not_parse;
if (demux->debug)
gst_sdp_message_dump (&sdp);
/* try to get and configure a manager */
if (!gst_sdp_demux_configure_manager (demux))
goto no_manager;
/* create streams with UDP sources and sinks */
n_streams = gst_sdp_message_medias_len (&sdp);
for (i = 0; i < n_streams; i++) {
stream = gst_sdp_demux_create_stream (demux, &sdp, i);
GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
if (!gst_sdp_demux_stream_configure_udp (demux, stream))
goto transport_failed;
if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
goto transport_failed;
}
/* set target state on session manager */
gst_element_set_state (demux->session, demux->target);
/* activate all streams */
for (walk = demux->streams; walk; walk = g_list_next (walk)) {
stream = (GstSDPStream *) walk->data;
/* configure target state on udp sources */
gst_element_set_state (stream->udpsrc[0], demux->target);
gst_element_set_state (stream->udpsrc[1], demux->target);
}
GST_SDP_STREAM_UNLOCK (demux);
return TRUE;
/* ERRORS */
transport_failed:
{
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
("Could not create RTP stream transport."));
GST_SDP_STREAM_UNLOCK (demux);
return FALSE;
}
no_manager:
{
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
("Could not create RTP session manager."));
GST_SDP_STREAM_UNLOCK (demux);
return FALSE;
}
could_not_parse:
{
gst_sdp_message_uninit (&sdp);
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
("Could not parse SDP message."));
GST_SDP_STREAM_UNLOCK (demux);
return FALSE;
}
}
static gboolean
gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event)
{
GstSDPDemux *demux;
gboolean res = TRUE;
demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
/* when we get EOS, start parsing the SDP */
res = gst_sdp_demux_start (demux);
gst_event_unref (event);
break;
default:
gst_event_unref (event);
break;
}
gst_object_unref (demux);
return res;
}
static GstFlowReturn
gst_sdp_demux_sink_chain (GstPad * pad, GstBuffer * buffer)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
/* push the SDP message in an adapter, we start doing something with it when
* we receive EOS */
gst_adapter_push (demux->adapter, buffer);
gst_object_unref (demux);
return GST_FLOW_OK;
}
static GstStateChangeReturn
gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
{
GstSDPDemux *demux;
GstStateChangeReturn ret;
demux = GST_SDP_DEMUX (element);
GST_SDP_STREAM_LOCK (demux);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* first attempt, don't ignore timeouts */
gst_adapter_clear (demux->adapter);
demux->ignore_timeout = FALSE;
demux->target = GST_STATE_PAUSED;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
demux->target = GST_STATE_PLAYING;
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE)
goto done;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
demux->target = GST_STATE_PAUSED;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_sdp_demux_cleanup (demux);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
default:
break;
}
done:
GST_SDP_STREAM_UNLOCK (demux);
return ret;
}