gstreamer/gst/sdp/gstsdpdemux.c

1411 lines
38 KiB
C
Raw Normal View History

/* GStreamer
* Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
/**
* SECTION:element-sdpdemux
*
* sdpdemux currently understands SDP as the input format of the session description.
* For each stream listed in the SDP a new rtp_stream%d pad will be created
* with caps derived from the SDP media description. This is a caps of mime type
* "application/x-rtp" that can be connected to any available RTP depayloader
* element.
*
* sdpdemux will internally instantiate an RTP session manager element
* that will handle the RTCP messages to and from the server, jitter removal,
* packet reordering along with providing a clock for the pipeline.
*
* sdpdemux acts like a live element and will therefore only generate data in the
* PLAYING state.
*
* <refsect2>
* <title>Example launch line</title>
* |[
* gst-launch gnomevfssrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
* ]| Establish a connection to an HTTP server that contains an SDP session description
* that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
* </refsect2>
*
* Last reviewed on 2007-10-01 (0.10.6)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef G_OS_WIN32
#ifdef _MSC_VER
#include <Winsock2.h>
#endif
/* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
* * minwg32 headers check WINVER before allowing the use of these */
#ifndef WINVER
#define WINVER 0x0501
#endif
#include <ws2tcpip.h>
#else
#include <sys/socket.h>
#include <netdb.h>
#include <netinet/in.h>
#endif
#include <stdlib.h>
#include <string.h>
#include <locale.h>
#include <stdio.h>
#include <stdarg.h>
#include <gst/rtp/gstrtppayloads.h>
#include <gst/sdp/gstsdpmessage.h>
#include "gstsdpdemux.h"
GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
#define GST_CAT_DEFAULT (sdpdemux_debug)
/* elementfactory information */
static const GstElementDetails gst_sdp_demux_details =
GST_ELEMENT_DETAILS ("SDP session setup",
"Codec/Demuxer/Network/RTP",
"Receive data over the network via SDP",
"Wim Taymans <wim.taymans@gmail.com>");
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/sdp"));
static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream%d",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS ("application/x-rtp"));
enum
{
/* FILL ME */
LAST_SIGNAL
};
#define DEFAULT_DEBUG FALSE
#define DEFAULT_TIMEOUT 10000000
#define DEFAULT_LATENCY_MS 200
enum
{
PROP_0,
PROP_DEBUG,
PROP_TIMEOUT,
PROP_LATENCY
};
static void gst_sdp_demux_base_init (gpointer g_class);
static void gst_sdp_demux_finalize (GObject * object);
static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstCaps *gst_sdp_demux_media_to_caps (gint pt,
const GstSDPMedia * media);
static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
GstStateChange transition);
static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
GstSDPStream * stream, GstEvent * event);
static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event);
static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad,
GstBuffer * buffer);
/*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
static void
_do_init (GType sdp_demux_type)
{
GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
}
GST_BOILERPLATE_FULL (GstSDPDemux, gst_sdp_demux, GstBin, GST_TYPE_BIN,
_do_init);
static void
gst_sdp_demux_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtptemplate));
gst_element_class_set_details (element_class, &gst_sdp_demux_details);
}
static void
gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
GstBinClass *gstbin_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gstbin_class = (GstBinClass *) klass;
gobject_class->set_property = gst_sdp_demux_set_property;
gobject_class->get_property = gst_sdp_demux_get_property;
gobject_class->finalize = gst_sdp_demux_finalize;
g_object_class_install_property (gobject_class, PROP_DEBUG,
g_param_spec_boolean ("debug", "Debug",
"Dump request and response messages to stdout",
DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (gobject_class, PROP_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
"Fail transport after UDP timeout microseconds (0 = disabled)",
0, G_MAXUINT64, DEFAULT_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (gobject_class, PROP_LATENCY,
g_param_spec_uint ("latency", "Buffer latency in ms",
"Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
gstelement_class->change_state = gst_sdp_demux_change_state;
gstbin_class->handle_message = gst_sdp_demux_handle_message;
}
static void
gst_sdp_demux_init (GstSDPDemux * demux, GstSDPDemuxClass * g_class)
{
demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
gst_pad_set_event_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
gst_pad_set_chain_function (demux->sinkpad,
GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
/* protects the streaming thread in interleaved mode or the polling
* thread in UDP mode. */
demux->stream_rec_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (demux->stream_rec_lock);
demux->adapter = gst_adapter_new ();
}
static void
gst_sdp_demux_finalize (GObject * object)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (object);
/* free locks */
g_static_rec_mutex_free (demux->stream_rec_lock);
g_free (demux->stream_rec_lock);
g_object_unref (demux->adapter);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_sdp_demux_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (object);
switch (prop_id) {
case PROP_DEBUG:
demux->debug = g_value_get_boolean (value);
break;
case PROP_TIMEOUT:
demux->udp_timeout = g_value_get_uint64 (value);
break;
case PROP_LATENCY:
demux->latency = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (object);
switch (prop_id) {
case PROP_DEBUG:
g_value_set_boolean (value, demux->debug);
break;
case PROP_TIMEOUT:
g_value_set_uint64 (value, demux->udp_timeout);
break;
case PROP_LATENCY:
g_value_set_uint (value, demux->latency);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gint
find_stream_by_id (GstSDPStream * stream, gconstpointer a)
{
gint id = GPOINTER_TO_INT (a);
if (stream->id == id)
return 0;
return -1;
}
static gint
find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
{
gint pt = GPOINTER_TO_INT (a);
if (stream->pt == pt)
return 0;
return -1;
}
static gint
find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
{
GstElement *src = (GstElement *) a;
if (stream->udpsrc[0] == src)
return 0;
if (stream->udpsrc[1] == src)
return 0;
return -1;
}
GstSDPStream *
find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
{
GList *lstream;
/* find and get stream */
if ((lstream =
g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
return (GstSDPStream *) lstream->data;
return NULL;
}
static void
gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
{
gint i;
GST_DEBUG_OBJECT (demux, "free stream %p", stream);
if (stream->caps)
gst_caps_unref (stream->caps);
for (i = 0; i < 2; i++) {
GstElement *udpsrc = stream->udpsrc[i];
if (udpsrc) {
gst_element_set_state (udpsrc, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
stream->udpsrc[i] = NULL;
}
}
if (stream->udpsink) {
gst_element_set_state (stream->udpsink, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
stream->udpsink = NULL;
}
if (stream->srcpad) {
gst_pad_set_active (stream->srcpad, FALSE);
if (stream->added) {
gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
stream->added = FALSE;
}
stream->srcpad = NULL;
}
g_free (stream);
}
static gboolean
is_multicast_address (const gchar * host_name)
{
struct addrinfo hints;
struct addrinfo *ai;
struct addrinfo *res;
gboolean ret = FALSE;
int err;
memset (&hints, 0, sizeof (hints));
hints.ai_socktype = SOCK_DGRAM;
g_return_val_if_fail (host_name, FALSE);
if ((err = getaddrinfo (host_name, NULL, &hints, &res)) < 0)
return FALSE;
for (ai = res; !ret && ai; ai = ai->ai_next) {
if (ai->ai_family == AF_INET)
ret =
IN_MULTICAST (ntohl (((struct sockaddr_in *) ai->ai_addr)->
sin_addr.s_addr));
else
ret =
IN6_IS_ADDR_MULTICAST (&((struct sockaddr_in6 *) ai->
ai_addr)->sin6_addr);
}
freeaddrinfo (res);
return ret;
}
static GstSDPStream *
gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
{
GstSDPStream *stream;
const gchar *payload, *rtcp;
const GstSDPMedia *media;
const GstSDPConnection *conn;
/* get media, should not return NULL */
media = gst_sdp_message_get_media (sdp, idx);
if (media == NULL)
return NULL;
stream = g_new0 (GstSDPStream, 1);
stream->parent = demux;
/* we mark the pad as not linked, we will mark it as OK when we add the pad to
* the element. */
stream->last_ret = GST_FLOW_OK;
stream->added = FALSE;
stream->disabled = FALSE;
stream->id = demux->numstreams++;
stream->eos = FALSE;
/* we must have a payload. No payload means we cannot create caps */
/* FIXME, handle multiple formats. */
if ((payload = gst_sdp_media_get_format (media, 0))) {
stream->pt = atoi (payload);
/* convert caps */
stream->caps = gst_sdp_demux_media_to_caps (stream->pt, media);
if (stream->pt >= 96) {
/* If we have a dynamic payload type, see if we have a stream with the
* same payload number. If there is one, they are part of the same
* container and we only need to add one pad. */
if (find_stream (demux, GINT_TO_POINTER (stream->pt),
(gpointer) find_stream_by_pt)) {
stream->container = TRUE;
}
}
}
if (!(conn = gst_sdp_media_get_connection (media, 0))) {
if (!(conn = gst_sdp_message_get_connection (sdp)))
goto no_connection;
}
stream->destination = conn->address;
stream->ttl = conn->ttl;
stream->multicast = is_multicast_address (stream->destination);
stream->rtp_port = gst_sdp_media_get_port (media);
if ((rtcp = gst_sdp_media_get_attribute_val (media, "rtcp"))) {
/* FIXME, RFC 3605 */
stream->rtcp_port = stream->rtp_port + 1;
} else {
stream->rtcp_port = stream->rtp_port + 1;
}
GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
/* we keep track of all streams */
demux->streams = g_list_append (demux->streams, stream);
return stream;
/* ERRORS */
no_connection:
{
gst_sdp_demux_stream_free (demux, stream);
return NULL;
}
}
static void
gst_sdp_demux_cleanup (GstSDPDemux * demux)
{
GList *walk;
GST_DEBUG_OBJECT (demux, "cleanup");
for (walk = demux->streams; walk; walk = g_list_next (walk)) {
GstSDPStream *stream = (GstSDPStream *) walk->data;
gst_sdp_demux_stream_free (demux, stream);
}
g_list_free (demux->streams);
demux->streams = NULL;
if (demux->session) {
if (demux->session_sig_id) {
g_signal_handler_disconnect (demux->session, demux->session_sig_id);
demux->session_sig_id = 0;
}
gst_element_set_state (demux->session, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), demux->session);
demux->session = NULL;
}
demux->numstreams = 0;
}
#define PARSE_INT(p, del, res) \
G_STMT_START { \
gchar *t = p; \
p = strstr (p, del); \
if (p == NULL) \
res = -1; \
else { \
*p = '\0'; \
p++; \
res = atoi (t); \
} \
} G_STMT_END
#define PARSE_STRING(p, del, res) \
G_STMT_START { \
gchar *t = p; \
p = strstr (p, del); \
if (p == NULL) { \
res = NULL; \
p = t; \
} \
else { \
*p = '\0'; \
p++; \
res = t; \
} \
} G_STMT_END
#define SKIP_SPACES(p) \
while (*p && g_ascii_isspace (*p)) \
p++;
/* rtpmap contains:
*
* <payload> <encoding_name>/<clock_rate>[/<encoding_params>]
*/
static gboolean
gst_sdp_demux_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name,
gint * rate, gchar ** params)
{
gchar *p, *t;
t = p = (gchar *) rtpmap;
PARSE_INT (p, " ", *payload);
if (*payload == -1)
return FALSE;
SKIP_SPACES (p);
if (*p == '\0')
return FALSE;
PARSE_STRING (p, "/", *name);
if (*name == NULL) {
GST_DEBUG ("no rate, name %s", p);
/* no rate, assume -1 then */
*name = p;
*rate = -1;
return TRUE;
}
t = p;
p = strstr (p, "/");
if (p == NULL) {
*rate = atoi (t);
return TRUE;
}
*p = '\0';
p++;
*rate = atoi (t);
t = p;
if (*p == '\0')
return TRUE;
*params = t;
return TRUE;
}
/*
* Mapping of caps to and from SDP fields:
*
* m=<media> <UDP port> RTP/AVP <payload>
* a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
* a=fmtp:<payload> <param>[=<value>];...
*/
static GstCaps *
gst_sdp_demux_media_to_caps (gint pt, const GstSDPMedia * media)
{
GstCaps *caps;
const gchar *rtpmap;
const gchar *fmtp;
gchar *name = NULL;
gint rate = -1;
gchar *params = NULL;
gchar *tmp;
GstStructure *s;
gint payload = 0;
gboolean ret;
/* get and parse rtpmap */
if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) {
ret = gst_sdp_demux_parse_rtpmap (rtpmap, &payload, &name, &rate, &params);
if (ret) {
if (payload != pt) {
/* we ignore the rtpmap if the payload type is different. */
g_warning ("rtpmap of wrong payload type, ignoring");
name = NULL;
rate = -1;
params = NULL;
}
} else {
/* if we failed to parse the rtpmap for a dynamic payload type, we have an
* error */
if (pt >= 96)
goto no_rtpmap;
/* else we can ignore */
g_warning ("error parsing rtpmap, ignoring");
}
} else {
/* dynamic payloads need rtpmap or we fail */
if (pt >= 96)
goto no_rtpmap;
}
/* check if we have a rate, if not, we need to look up the rate from the
* default rates based on the payload types. */
if (rate == -1) {
const GstRTPPayloadInfo *info;
if (GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
/* dynamic types, use media and encoding_name */
tmp = g_ascii_strdown (media->media, -1);
info = gst_rtp_payload_info_for_name (tmp, name);
g_free (tmp);
} else {
/* static types, use payload type */
info = gst_rtp_payload_info_for_pt (pt);
}
if (info) {
if ((rate = info->clock_rate) == 0)
rate = -1;
}
/* we fail if we cannot find one */
if (rate == -1)
goto no_rate;
}
tmp = g_ascii_strdown (media->media, -1);
caps = gst_caps_new_simple ("application/x-rtp",
"media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL);
g_free (tmp);
s = gst_caps_get_structure (caps, 0);
gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL);
/* encoding name must be upper case */
if (name != NULL) {
tmp = g_ascii_strup (name, -1);
gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL);
g_free (tmp);
}
/* params must be lower case */
if (params != NULL) {
tmp = g_ascii_strdown (params, -1);
gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL);
g_free (tmp);
}
/* parse optional fmtp: field */
if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) {
gchar *p;
gint payload = 0;
p = (gchar *) fmtp;
/* p is now of the format <payload> <param>[=<value>];... */
PARSE_INT (p, " ", payload);
if (payload != -1 && payload == pt) {
gchar **pairs;
gint i;
/* <param>[=<value>] are separated with ';' */
pairs = g_strsplit (p, ";", 0);
for (i = 0; pairs[i]; i++) {
gchar *valpos;
gchar *val, *key;
/* the key may not have a '=', the value can have other '='s */
valpos = strstr (pairs[i], "=");
if (valpos) {
/* we have a '=' and thus a value, remove the '=' with \0 */
*valpos = '\0';
/* value is everything between '=' and ';'. FIXME, strip? */
val = g_strstrip (valpos + 1);
} else {
/* simple <param>;.. is translated into <param>=1;... */
val = "1";
}
/* strip the key of spaces, convert key to lowercase but not the value. */
key = g_strstrip (pairs[i]);
if (strlen (key) > 1) {
tmp = g_ascii_strdown (key, -1);
gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL);
g_free (tmp);
}
}
g_strfreev (pairs);
}
}
return caps;
/* ERRORS */
no_rtpmap:
{
g_warning ("rtpmap type not given for dynamic payload %d", pt);
return NULL;
}
no_rate:
{
g_warning ("rate unknown for payload type %d", pt);
return NULL;
}
}
/* this callback is called when the session manager generated a new src pad with
* payloaded RTP packets. We simply ghost the pad here. */
static void
new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
{
gchar *name;
GstPadTemplate *template;
gint id, ssrc, pt;
GList *lstream;
GstSDPStream *stream;
gboolean all_added;
GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
GST_SDP_STREAM_LOCK (demux);
/* find stream */
name = gst_object_get_name (GST_OBJECT_CAST (pad));
if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3)
goto unknown_stream;
GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %d, PT %d", id, ssrc, pt);
stream =
find_stream (demux, GINT_TO_POINTER (id), (gpointer) find_stream_by_id);
if (stream == NULL)
goto unknown_stream;
/* no need for a timeout anymore now */
g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
/* create a new pad we will use to stream to */
template = gst_static_pad_template_get (&rtptemplate);
stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
gst_object_unref (template);
g_free (name);
stream->added = TRUE;
gst_pad_set_active (stream->srcpad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
/* check if we added all streams */
all_added = TRUE;
for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
stream = (GstSDPStream *) lstream->data;
/* a container stream only needs one pad added. Also disabled streams don't
* count */
if (!stream->container && !stream->disabled && !stream->added) {
all_added = FALSE;
break;
}
}
GST_SDP_STREAM_UNLOCK (demux);
if (all_added) {
GST_DEBUG_OBJECT (demux, "We added all streams");
/* when we get here, all stream are added and we can fire the no-more-pads
* signal. */
gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
}
return;
/* ERRORS */
unknown_stream:
{
GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
GST_SDP_STREAM_UNLOCK (demux);
g_free (name);
return;
}
}
static GstCaps *
request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
{
GstSDPStream *stream;
GstCaps *caps;
GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
session);
GST_SDP_STREAM_LOCK (demux);
stream =
find_stream (demux, GINT_TO_POINTER (session),
(gpointer) find_stream_by_id);
if (!stream)
goto unknown_stream;
caps = stream->caps;
if (caps)
gst_caps_ref (caps);
GST_SDP_STREAM_UNLOCK (demux);
return caps;
unknown_stream:
{
GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
GST_SDP_STREAM_UNLOCK (demux);
return NULL;
}
}
static void
gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session)
{
GstSDPStream *stream;
GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
/* get stream for session */
stream =
find_stream (demux, GINT_TO_POINTER (session),
(gpointer) find_stream_by_id);
if (!stream)
goto unknown_stream;
if (stream->eos)
goto was_eos;
stream->eos = TRUE;
gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
return;
/* ERRORS */
unknown_stream:
{
GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
return;
}
was_eos:
{
GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
return;
}
}
static void
on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
GstSDPDemux * demux)
{
GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
session);
gst_sdp_demux_do_stream_eos (demux, session);
}
static void
on_timeout (GstElement * manager, guint session, guint32 ssrc,
GstSDPDemux * demux)
{
GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
gst_sdp_demux_do_stream_eos (demux, session);
}
/* try to get and configure a manager */
static gboolean
gst_sdp_demux_configure_manager (GstSDPDemux * demux)
{
GstStateChangeReturn ret;
/* configure the session manager */
if (!(demux->session = gst_element_factory_make ("gstrtpbin", NULL)))
goto manager_failed;
/* we manage this element */
gst_bin_add (GST_BIN_CAST (demux), demux->session);
ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
if (ret == GST_STATE_CHANGE_FAILURE)
goto start_session_failure;
g_object_set (demux->session, "latency", demux->latency, NULL);
/* connect to signals if we did not already do so */
GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
demux->session_sig_id =
g_signal_connect (demux->session, "pad-added",
(GCallback) new_session_pad, demux);
demux->session_ptmap_id =
g_signal_connect (demux->session, "request-pt-map",
(GCallback) request_pt_map, demux);
g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
demux);
g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
demux);
g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
demux);
return TRUE;
/* ERRORS */
manager_failed:
{
GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
return FALSE;
}
start_session_failure:
{
GST_DEBUG_OBJECT (demux, "could not start session");
gst_element_set_state (demux->session, GST_STATE_NULL);
gst_bin_remove (GST_BIN_CAST (demux), demux->session);
demux->session = NULL;
return FALSE;
}
}
static gboolean
gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
{
gchar *uri, *name, *destination;
GstPad *pad;
GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
/* if the destination is not a multicast address, we just want to listen on
* our local ports */
if (!stream->multicast)
destination = "0.0.0.0";
else
destination = stream->destination;
/* creating UDP source */
if (stream->rtp_port != -1) {
GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
stream->rtp_port);
uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
stream->udpsrc[0] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
g_free (uri);
if (stream->udpsrc[0] == NULL)
goto no_element;
/* take ownership */
gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
GST_DEBUG_OBJECT (demux,
"setting up UDP source with timeout %" G_GINT64_FORMAT,
demux->udp_timeout);
/* configure a timeout on the UDP port. When the timeout message is
* posted, we assume UDP transport is not possible. */
g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", demux->udp_timeout,
NULL);
/* get output pad of the UDP source. */
pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
name = g_strdup_printf ("recv_rtp_sink_%d", stream->id);
stream->channelpad[0] = gst_element_get_request_pad (demux->session, name);
g_free (name);
GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
/* configure for UDP delivery, we need to connect the UDP pads to
* the session plugin. */
gst_pad_link (pad, stream->channelpad[0]);
gst_object_unref (pad);
/* change state */
gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
}
/* creating another UDP source */
if (stream->rtcp_port != -1) {
GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
stream->rtcp_port);
uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
stream->udpsrc[1] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
g_free (uri);
if (stream->udpsrc[1] == NULL)
goto no_element;
/* take ownership */
gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id);
stream->channelpad[1] = gst_element_get_request_pad (demux->session, name);
g_free (name);
pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
gst_pad_link (pad, stream->channelpad[1]);
gst_object_unref (pad);
gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
}
return TRUE;
/* ERRORS */
no_element:
{
GST_DEBUG_OBJECT (demux, "no UDP source element found");
return FALSE;
}
}
/* configure the UDP sink back to the server for status reports */
static gboolean
gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
GstSDPStream * stream)
{
GstPad *pad, *sinkpad;
gint port, sockfd = -1;
gchar *destination, *uri, *name;
/* get destination and port */
port = stream->rtcp_port;
destination = stream->destination;
GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
uri = g_strdup_printf ("udp://%s:%d", destination, port);
stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL);
g_free (uri);
if (stream->udpsink == NULL)
goto no_sink_element;
/* we clear all destinations because we don't really know where to send the
* RTCP to and we want to avoid sending it to our own ports.
* FIXME when we get an RTCP packet from the sender, we could look at its
* source port and address and try to send RTCP there. */
if (!stream->multicast)
g_signal_emit_by_name (stream->udpsink, "clear");
/* no sync needed */
g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
/* no async state changes needed */
g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
if (stream->udpsrc[1]) {
/* configure socket, we give it the same UDP socket as the udpsrc for RTCP
* because some servers check the port number of where it sends RTCP to identify
* the RTCP packets it receives */
g_object_get (G_OBJECT (stream->udpsrc[1]), "sock", &sockfd, NULL);
GST_DEBUG_OBJECT (demux, "UDP src has sock %d", sockfd);
/* configure socket and make sure udpsink does not close it when shutting
* down, it belongs to udpsrc after all. */
g_object_set (G_OBJECT (stream->udpsink), "sockfd", sockfd, NULL);
g_object_set (G_OBJECT (stream->udpsink), "closefd", FALSE, NULL);
}
/* we keep this playing always */
gst_element_set_locked_state (stream->udpsink, TRUE);
gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
/* get session RTCP pad */
name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
pad = gst_element_get_request_pad (demux->session, name);
g_free (name);
/* and link */
if (pad) {
sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
gst_pad_link (pad, sinkpad);
gst_object_unref (sinkpad);
} else {
/* not very fatal, we just won't be able to send RTCP */
GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
}
return TRUE;
/* ERRORS */
no_sink_element:
{
GST_DEBUG_OBJECT (demux, "no UDP sink element found");
return FALSE;
}
}
static GstFlowReturn
gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
GstFlowReturn ret)
{
GList *streams;
/* store the value */
stream->last_ret = ret;
/* if it's success we can return the value right away */
if (GST_FLOW_IS_SUCCESS (ret))
goto done;
/* any other error that is not-linked can be returned right
* away */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
/* only return NOT_LINKED if all other pads returned NOT_LINKED */
for (streams = demux->streams; streams; streams = g_list_next (streams)) {
GstSDPStream *ostream = (GstSDPStream *) streams->data;
ret = ostream->last_ret;
/* some other return value (must be SUCCESS but we can return
* other values as well) */
if (ret != GST_FLOW_NOT_LINKED)
goto done;
}
/* if we get here, all other pads were unlinked and we return
* NOT_LINKED then */
done:
return ret;
}
static void
gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
GstEvent * event)
{
/* only streams that have a connection to the outside world */
if (stream->srcpad == NULL)
goto done;
if (stream->channelpad[0]) {
gst_event_ref (event);
gst_pad_send_event (stream->channelpad[0], event);
}
if (stream->channelpad[1]) {
gst_event_ref (event);
gst_pad_send_event (stream->channelpad[1], event);
}
done:
gst_event_unref (event);
}
static void
gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ELEMENT:
{
const GstStructure *s = gst_message_get_structure (message);
if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
gboolean ignore_timeout;
GST_DEBUG_OBJECT (bin, "timeout on UDP port");
GST_OBJECT_LOCK (demux);
ignore_timeout = demux->ignore_timeout;
demux->ignore_timeout = TRUE;
GST_OBJECT_UNLOCK (demux);
/* we only act on the first udp timeout message, others are irrelevant
* and can be ignored. */
if (ignore_timeout)
gst_message_unref (message);
else {
GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
("Could not receive any UDP packets for %.4f seconds, maybe your "
"firewall is blocking it.",
gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
}
return;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
case GST_MESSAGE_ERROR:
{
GstObject *udpsrc;
GstSDPStream *stream;
GstFlowReturn ret;
udpsrc = GST_MESSAGE_SRC (message);
GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
/* fatal but not our message, forward */
if (!stream)
goto forward;
/* we ignore the RTCP udpsrc */
if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
goto done;
/* if we get error messages from the udp sources, that's not a problem as
* long as not all of them error out. We also don't really know what the
* problem is, the message does not give enough detail... */
ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
if (ret != GST_FLOW_OK)
goto forward;
done:
gst_message_unref (message);
break;
forward:
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
default:
{
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
}
}
static gboolean
gst_sdp_demux_start (GstSDPDemux * demux)
{
guint8 *data;
guint size;
gint i, n_streams;
GstSDPMessage sdp = { 0 };
GstSDPStream *stream = NULL;
GList *walk;
/* grab the lock so that no state change can interfere */
GST_SDP_STREAM_LOCK (demux);
GST_DEBUG_OBJECT (demux, "parse SDP...");
size = gst_adapter_available (demux->adapter);
data = gst_adapter_take (demux->adapter, size);
gst_sdp_message_init (&sdp);
if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
goto could_not_parse;
if (demux->debug)
gst_sdp_message_dump (&sdp);
/* try to get and configure a manager */
if (!gst_sdp_demux_configure_manager (demux))
goto no_manager;
/* create streams with UDP sources and sinks */
n_streams = gst_sdp_message_medias_len (&sdp);
for (i = 0; i < n_streams; i++) {
stream = gst_sdp_demux_create_stream (demux, &sdp, i);
GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
if (!gst_sdp_demux_stream_configure_udp (demux, stream))
goto transport_failed;
if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
goto transport_failed;
}
/* set target state on session manager */
gst_element_set_state (demux->session, demux->target);
/* activate all streams */
for (walk = demux->streams; walk; walk = g_list_next (walk)) {
stream = (GstSDPStream *) walk->data;
/* configure target state on udp sources */
gst_element_set_state (stream->udpsrc[0], demux->target);
gst_element_set_state (stream->udpsrc[1], demux->target);
}
GST_SDP_STREAM_UNLOCK (demux);
return TRUE;
/* ERRORS */
transport_failed:
{
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
("Could not create RTP stream transport."));
GST_SDP_STREAM_UNLOCK (demux);
return FALSE;
}
no_manager:
{
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
("Could not create RTP session manager."));
GST_SDP_STREAM_UNLOCK (demux);
return FALSE;
}
could_not_parse:
{
gst_sdp_message_uninit (&sdp);
GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
("Could not parse SDP message."));
GST_SDP_STREAM_UNLOCK (demux);
return FALSE;
}
}
static gboolean
gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event)
{
GstSDPDemux *demux;
gboolean res = TRUE;
demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
/* when we get EOS, start parsing the SDP */
res = gst_sdp_demux_start (demux);
gst_event_unref (event);
break;
default:
gst_event_unref (event);
break;
}
gst_object_unref (demux);
return res;
}
static GstFlowReturn
gst_sdp_demux_sink_chain (GstPad * pad, GstBuffer * buffer)
{
GstSDPDemux *demux;
demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
/* push the SDP message in an adapter, we start doing something with it when
* we receive EOS */
gst_adapter_push (demux->adapter, buffer);
gst_object_unref (demux);
return GST_FLOW_OK;
}
static GstStateChangeReturn
gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
{
GstSDPDemux *demux;
GstStateChangeReturn ret;
demux = GST_SDP_DEMUX (element);
GST_SDP_STREAM_LOCK (demux);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* first attempt, don't ignore timeouts */
gst_adapter_clear (demux->adapter);
demux->ignore_timeout = FALSE;
demux->target = GST_STATE_PAUSED;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
demux->target = GST_STATE_PLAYING;
break;
default:
break;
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE)
goto done;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
demux->target = GST_STATE_PAUSED;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_sdp_demux_cleanup (demux);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
default:
break;
}
done:
GST_SDP_STREAM_UNLOCK (demux);
return ret;
}