gstreamer/gst/udp/gstudpsrc.c
Peter Kjellerstedt 7c56695160 udpsrc: Make sure the sockaddr length used for recvfrom() is big enough.
Previously the sockaddr length used for recvfrom() was calculated as
sizeof (struct sockaddr). However, this is too little to hold an IPv6
address, so the full size of the gst_sockaddr union should be used
instead.
2009-02-25 12:52:37 +01:00

1040 lines
31 KiB
C

/* GStreamer
* Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
* Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
/**
* SECTION:element-udpsrc
* @see_also: udpsink, multifdsink
*
* udpsrc is a network source that reads UDP packets from the network.
* It can be combined with RTP depayloaders to implement RTP streaming.
*
* The udpsrc element supports automatic port allocation by setting the
* #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
* allocated port can be obtained by reading the port property.
*
* udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
* property to the IP address of the multicast group.
*
* Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
* property, udpsrc will then not allocate a socket itself but use the provided
* one.
*
* The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
* so that they can be autoplugged in GStreamer pipelines. This is very usefull
* for RTP implementations where the contents of the UDP packets is transfered
* out-of-bounds using SDP or other means.
*
* The #GstUDPSrc:buffer property is used to change the default kernel buffer
* sizes used for receiving packets. The buffer size may be increased for
* high-volume connections, or may be decreased to limit the possible backlog of
* incoming data. The system places an absolute limit on these values, on Linux,
* for example, the default buffer size is typically 50K and can be increased to
* maximally 100K.
*
* The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
* number of bytes from the start of the raw udp packet and can be used to strip
* off proprietary header, for example.
*
* The udpsrc is always a live source. It does however not provide a #GstClock,
* this is left for upstream elements such as an RTP session manager or demuxer
* (such as an MPEG demuxer). As with all live sources, the captured buffers
* will have their timestamp set to the current running time of the pipeline.
*
* udpsrc implements a #GstURIHandler interface that handles udp://host:port
* type URIs.
*
* If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
* will generate an element message named
* <classname>&quot;GstUDPSrcTimeout&quot;</classname>
* if no data was recieved in the given timeout.
* The message's structure contains one field:
* <itemizedlist>
* <listitem>
* <para>
* #guint64
* <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
* expired when waiting for data.
* </para>
* </listitem>
* </itemizedlist>
* The message is typically used to detect that no UDP arrives in the receiver
* because it is blocked by a firewall.
* </para>
* <para>
* A custom file descriptor can be configured with the
* #GstUDPSrc:sockfd property. The socket will be closed when setting the
* element to READY by default. This behaviour can be
* overriden with the #GstUDPSrc:closefd property, in which case the application
* is responsible for closing the file descriptor.
*
* <refsect2>
* <title>Examples</title>
* |[
* gst-launch -v udpsrc ! fakesink dump=1
* ]| A pipeline to read from the default port and dump the udp packets.
* To actually generate udp packets on the default port one can use the
* udpsink element. When running the following pipeline in another terminal, the
* above mentioned pipeline should dump data packets to the console.
* |[
* gst-launch -v audiotestsrc ! udpsink
* ]|
* |[
* gst-launch -v udpsrc port=0 ! fakesink
* ]| read udp packets from a free port.
* </refsect2>
*
* Last reviewed on 2007-09-20 (0.10.7)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstudpsrc.h"
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <stdlib.h>
#if defined _MSC_VER && (_MSC_VER >= 1400)
#include <io.h>
#endif
#include <gst/netbuffer/gstnetbuffer.h>
#ifdef G_OS_WIN32
typedef int socklen_t;
#endif
#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif
GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
#define GST_CAT_DEFAULT (udpsrc_debug)
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static const GstElementDetails gst_udpsrc_details =
GST_ELEMENT_DETAILS ("UDP packet receiver",
"Source/Network",
"Receive data over the network via UDP",
"Wim Taymans <wim@fluendo.com>\n"
"Thijs Vermeir <thijs.vermeir@barco.com>");
#define UDP_DEFAULT_PORT 4951
#define UDP_DEFAULT_MULTICAST_GROUP "0.0.0.0"
#define UDP_DEFAULT_URI "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
#define UDP_DEFAULT_CAPS NULL
#define UDP_DEFAULT_SOCKFD -1
#define UDP_DEFAULT_BUFFER_SIZE 0
#define UDP_DEFAULT_TIMEOUT 0
#define UDP_DEFAULT_SKIP_FIRST_BYTES 0
#define UDP_DEFAULT_CLOSEFD TRUE
#define UDP_DEFAULT_SOCK -1
#define UDP_DEFAULT_AUTO_MULTICAST TRUE
enum
{
PROP_0,
PROP_PORT,
PROP_MULTICAST_GROUP,
PROP_URI,
PROP_CAPS,
PROP_SOCKFD,
PROP_BUFFER_SIZE,
PROP_TIMEOUT,
PROP_SKIP_FIRST_BYTES,
PROP_CLOSEFD,
PROP_SOCK,
PROP_AUTO_MULTICAST,
PROP_LAST
};
#define CLOSE_IF_REQUESTED(udpctx) \
G_STMT_START { \
if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
CLOSE_SOCKET(udpctx->sock.fd); \
if (udpctx->sock.fd == udpctx->sockfd) \
udpctx->sockfd = UDP_DEFAULT_SOCKFD; \
} \
udpctx->sock.fd = UDP_DEFAULT_SOCK; \
} G_STMT_END
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
static void gst_udpsrc_finalize (GObject * object);
static void gst_udpsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_udpsrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void
_do_init (GType type)
{
static const GInterfaceInfo urihandler_info = {
gst_udpsrc_uri_handler_init,
NULL,
NULL
};
g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
}
GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
_do_init);
static void
gst_udpsrc_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&src_template));
gst_element_class_set_details (element_class, &gst_udpsrc_details);
}
static void
gst_udpsrc_class_init (GstUDPSrcClass * klass)
{
GObjectClass *gobject_class;
GstBaseSrcClass *gstbasesrc_class;
GstPushSrcClass *gstpushsrc_class;
gobject_class = (GObjectClass *) klass;
gstbasesrc_class = (GstBaseSrcClass *) klass;
gstpushsrc_class = (GstPushSrcClass *) klass;
gobject_class->set_property = gst_udpsrc_set_property;
gobject_class->get_property = gst_udpsrc_get_property;
gobject_class->finalize = gst_udpsrc_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
g_param_spec_int ("port", "Port",
"The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
UDP_DEFAULT_PORT, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
g_param_spec_string ("multicast_group", "Multicast Group",
"The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_URI,
g_param_spec_string ("uri", "URI",
"URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_CAPS,
g_param_spec_boxed ("caps", "Caps",
"The caps of the source pad", GST_TYPE_CAPS, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_SOCKFD,
g_param_spec_int ("sockfd", "Socket Handle",
"Socket to use for UDP reception. (-1 == allocate)",
-1, G_MAXINT, UDP_DEFAULT_SOCKFD, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
g_param_spec_int ("buffer-size", "Buffer Size",
"Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
"Post a message after timeout microseconds (0 = disabled)", 0,
G_MAXUINT64, UDP_DEFAULT_TIMEOUT, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass),
PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
"Skip first bytes", "number of bytes to skip for each udp packet", 0,
G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_CLOSEFD,
g_param_spec_boolean ("closefd", "Close sockfd",
"Close sockfd if passed as property on state change",
UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_SOCK,
g_param_spec_int ("sock", "Socket Handle",
"Socket currently in use for UDP reception. (-1 = no socket)",
-1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
g_param_spec_boolean ("auto-multicast", "Auto Multicast",
"Automatically join/leave multicast groups",
UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
gstbasesrc_class->start = gst_udpsrc_start;
gstbasesrc_class->stop = gst_udpsrc_stop;
gstbasesrc_class->unlock = gst_udpsrc_unlock;
gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
gstpushsrc_class->create = gst_udpsrc_create;
}
static void
gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
{
WSA_STARTUP (udpsrc);
udpsrc->port = UDP_DEFAULT_PORT;
udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
udpsrc->uri = g_strdup (UDP_DEFAULT_URI);
udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
udpsrc->externalfd = (udpsrc->sockfd != -1);
udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
udpsrc->sock.fd = UDP_DEFAULT_SOCK;
/* configure basesrc to be a live source */
gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
/* make basesrc output a segment in time */
gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
/* make basesrc set timestamps on outgoing buffers based on the running_time
* when they were captured */
gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
}
static void
gst_udpsrc_finalize (GObject * object)
{
GstUDPSrc *udpsrc;
udpsrc = GST_UDPSRC (object);
if (udpsrc->caps)
gst_caps_unref (udpsrc->caps);
g_free (udpsrc->multi_group);
g_free (udpsrc->uri);
if (udpsrc->sockfd >= 0 && udpsrc->closefd)
CLOSE_SOCKET (udpsrc->sockfd);
WSA_CLEANUP (object);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static GstCaps *
gst_udpsrc_getcaps (GstBaseSrc * src)
{
GstUDPSrc *udpsrc;
udpsrc = GST_UDPSRC (src);
if (udpsrc->caps)
return gst_caps_ref (udpsrc->caps);
else
return gst_caps_new_any ();
}
/* read a message from the error queue */
static void
clear_error (GstUDPSrc * udpsrc)
{
struct msghdr cmsg;
char cbuf[128];
char msgbuf[CMSG_SPACE (128)];
struct iovec iov;
/* Flush ERRORS from fd so next poll will not return at once */
/* No need for address : We look for local error */
cmsg.msg_name = NULL;
cmsg.msg_namelen = 0;
/* IOV */
memset (&cbuf, 0, sizeof (cbuf));
iov.iov_base = cbuf;
iov.iov_len = sizeof (cbuf);
cmsg.msg_iov = &iov;
cmsg.msg_iovlen = 1;
/* msg_control */
memset (&msgbuf, 0, sizeof (msgbuf));
cmsg.msg_control = &msgbuf;
cmsg.msg_controllen = sizeof (msgbuf);
recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
}
static GstFlowReturn
gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
{
GstUDPSrc *udpsrc;
GstNetBuffer *outbuf;
union gst_sockaddr
{
struct sockaddr sa;
struct sockaddr_in sa_in;
struct sockaddr_in6 sa_in6;
struct sockaddr_storage sa_stor;
} sa;
socklen_t slen;
guint8 *pktdata;
gint pktsize;
#ifdef G_OS_UNIX
gint readsize;
#elif defined G_OS_WIN32
gulong readsize;
#endif
GstClockTime timeout;
gint ret;
gboolean try_again;
udpsrc = GST_UDPSRC_CAST (psrc);
retry:
/* quick check, avoid going in select when we already have data */
readsize = 0;
if (G_UNLIKELY ((ret =
IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed;
if (readsize > 0)
goto no_select;
if (udpsrc->timeout > 0) {
timeout = udpsrc->timeout * GST_USECOND;
} else {
timeout = GST_CLOCK_TIME_NONE;
}
do {
try_again = FALSE;
GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
udpsrc->timeout);
ret = gst_poll_wait (udpsrc->fdset, timeout);
GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
if (G_UNLIKELY (ret < 0)) {
if (errno == EBUSY)
goto stopped;
#ifdef G_OS_WIN32
if (WSAGetLastError () != WSAEINTR)
goto select_error;
#else
if (errno != EAGAIN && errno != EINTR)
goto select_error;
#endif
try_again = TRUE;
} else if (G_UNLIKELY (ret == 0)) {
/* timeout, post element message */
gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
gst_message_new_element (GST_OBJECT_CAST (udpsrc),
gst_structure_new ("GstUDPSrcTimeout",
"timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
try_again = TRUE;
}
} while (G_UNLIKELY (try_again));
/* ask how much is available for reading on the socket, this should be exactly
* one UDP packet. We will check the return value, though, because in some
* case it can return 0 and we don't want a 0 sized buffer. */
readsize = 0;
if (G_UNLIKELY ((ret =
IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed;
/* if we get here and there is nothing to read from the socket, the select got
* woken up by activity on the socket but it was not a read. We know someone
* will also do something with the socket so that we don't go into an infinite
* loop in the select(). */
if (G_UNLIKELY (!readsize)) {
clear_error (udpsrc);
goto retry;
}
no_select:
GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);
pktdata = g_malloc (readsize);
pktsize = readsize;
while (TRUE) {
slen = sizeof (sa);
#ifdef G_OS_WIN32
ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
&slen);
#else
ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
#endif
if (G_UNLIKELY (ret < 0)) {
#ifdef G_OS_WIN32
/* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
* generated a "port unreachable" ICMP response. We ignore that and try
* again. */
if (WSAGetLastError () == WSAECONNRESET) {
g_free (pktdata);
pktdata = NULL;
goto retry;
}
if (WSAGetLastError () != WSAEINTR)
goto receive_error;
#else
if (errno != EAGAIN && errno != EINTR)
goto receive_error;
#endif
} else
break;
}
/* special case buffer so receivers can also track the address */
outbuf = gst_netbuffer_new ();
GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
/* patch pktdata and len when stripping off the headers */
if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
goto skip_error;
pktdata += udpsrc->skip_first_bytes;
ret -= udpsrc->skip_first_bytes;
}
GST_BUFFER_DATA (outbuf) = pktdata;
GST_BUFFER_SIZE (outbuf) = ret;
switch (sa.sa.sa_family) {
case AF_INET:
{
gst_netaddress_set_ip4_address (&outbuf->from, sa.sa_in.sin_addr.s_addr,
sa.sa_in.sin_port);
}
break;
case AF_INET6:
{
guint8 ip6[16];
memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
gst_netaddress_set_ip6_address (&outbuf->from, ip6, sa.sa_in6.sin6_port);
}
break;
default:
#ifdef G_OS_WIN32
WSASetLastError (WSAEAFNOSUPPORT);
#else
errno = EAFNOSUPPORT;
#endif
goto receive_error;
}
GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
*buf = GST_BUFFER_CAST (outbuf);
return GST_FLOW_OK;
/* ERRORS */
select_error:
{
GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
("select error %d: %s (%d)", ret, g_strerror (errno), errno));
return GST_FLOW_ERROR;
}
stopped:
{
GST_DEBUG ("stop called");
return GST_FLOW_WRONG_STATE;
}
ioctl_failed:
{
GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
return GST_FLOW_ERROR;
}
receive_error:
{
g_free (pktdata);
#ifdef G_OS_WIN32
GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
#else
GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
#endif
return GST_FLOW_ERROR;
}
skip_error:
{
GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
("UDP buffer to small to skip header"));
return GST_FLOW_ERROR;
}
}
/* Call this function when multicastgroup and/or port are updated */
static void
gst_udpsrc_update_uri (GstUDPSrc * src)
{
g_free (src->uri);
src->uri = g_strdup_printf ("udp://%s:%d", src->multi_group, src->port);
GST_DEBUG_OBJECT (src, "updated uri to %s", src->uri);
}
static gboolean
gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
{
gchar *protocol;
gchar *location;
gchar *colptr;
protocol = gst_uri_get_protocol (uri);
if (strcmp (protocol, "udp") != 0)
goto wrong_protocol;
g_free (protocol);
location = gst_uri_get_location (uri);
if (!location)
return FALSE;
colptr = strrchr (location, ':');
if (colptr != NULL) {
g_free (src->multi_group);
src->multi_group = g_strndup (location, colptr - location);
src->port = atoi (colptr + 1);
} else {
g_free (src->multi_group);
src->multi_group = g_strdup (location);
src->port = UDP_DEFAULT_PORT;
}
g_free (location);
gst_udpsrc_update_uri (src);
return TRUE;
/* ERRORS */
wrong_protocol:
{
g_free (protocol);
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
("error parsing uri %s: wrong protocol (%s != udp)", uri, protocol));
return FALSE;
}
}
static void
gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
GParamSpec * pspec)
{
GstUDPSrc *udpsrc = GST_UDPSRC (object);
switch (prop_id) {
case PROP_BUFFER_SIZE:
udpsrc->buffer_size = g_value_get_int (value);
break;
case PROP_PORT:
udpsrc->port = g_value_get_int (value);
gst_udpsrc_update_uri (udpsrc);
break;
case PROP_MULTICAST_GROUP:
g_free (udpsrc->multi_group);
if (g_value_get_string (value) == NULL)
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
else
udpsrc->multi_group = g_value_dup_string (value);
gst_udpsrc_update_uri (udpsrc);
break;
case PROP_URI:
gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
break;
case PROP_CAPS:
{
const GstCaps *new_caps_val = gst_value_get_caps (value);
GstCaps *new_caps;
GstCaps *old_caps;
if (new_caps_val == NULL) {
new_caps = gst_caps_new_any ();
} else {
new_caps = gst_caps_copy (new_caps_val);
}
old_caps = udpsrc->caps;
udpsrc->caps = new_caps;
if (old_caps)
gst_caps_unref (old_caps);
gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
break;
}
case PROP_SOCKFD:
if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
udpsrc->closefd)
CLOSE_SOCKET (udpsrc->sockfd);
udpsrc->sockfd = g_value_get_int (value);
GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
break;
case PROP_TIMEOUT:
udpsrc->timeout = g_value_get_uint64 (value);
break;
case PROP_SKIP_FIRST_BYTES:
udpsrc->skip_first_bytes = g_value_get_int (value);
break;
case PROP_CLOSEFD:
udpsrc->closefd = g_value_get_boolean (value);
break;
case PROP_AUTO_MULTICAST:
udpsrc->auto_multicast = g_value_get_boolean (value);
break;
default:
break;
}
}
static void
gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstUDPSrc *udpsrc = GST_UDPSRC (object);
switch (prop_id) {
case PROP_BUFFER_SIZE:
g_value_set_int (value, udpsrc->buffer_size);
break;
case PROP_PORT:
g_value_set_int (value, udpsrc->port);
break;
case PROP_MULTICAST_GROUP:
g_value_set_string (value, udpsrc->multi_group);
break;
case PROP_URI:
g_value_set_string (value, udpsrc->uri);
break;
case PROP_CAPS:
gst_value_set_caps (value, udpsrc->caps);
break;
case PROP_SOCKFD:
g_value_set_int (value, udpsrc->sockfd);
break;
case PROP_TIMEOUT:
g_value_set_uint64 (value, udpsrc->timeout);
break;
case PROP_SKIP_FIRST_BYTES:
g_value_set_int (value, udpsrc->skip_first_bytes);
break;
case PROP_CLOSEFD:
g_value_set_boolean (value, udpsrc->closefd);
break;
case PROP_SOCK:
g_value_set_int (value, udpsrc->sock.fd);
break;
case PROP_AUTO_MULTICAST:
g_value_set_boolean (value, udpsrc->auto_multicast);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/* create a socket for sending to remote machine */
static gboolean
gst_udpsrc_start (GstBaseSrc * bsrc)
{
guint bc_val;
guint err_val;
gint reuse;
int port;
GstUDPSrc *src;
gint ret;
int rcvsize;
#ifdef G_OS_WIN32
gint len;
#else
guint len;
#endif
src = GST_UDPSRC (bsrc);
if (src->sockfd == -1) {
/* need to allocate a socket */
GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->multi_group,
src->port);
if ((ret =
gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0)
goto getaddrinfo_error;
if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
goto no_socket;
src->sock.fd = ret;
src->externalfd = FALSE;
reuse = 1;
if ((ret =
setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
sizeof (reuse))) < 0)
goto setsockopt_error;
GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
len = gst_udp_get_sockaddr_length (&src->myaddr);
if ((ret = bind (src->sock.fd, (struct sockaddr *) &src->myaddr, len)) < 0)
goto bind_error;
len = sizeof (src->myaddr);
if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
&len)) < 0)
goto getsockname_error;
} else {
GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
/* we use the configured socket, try to get some info about it */
len = sizeof (src->myaddr);
if ((ret =
getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
&len)) < 0)
goto getsockname_error;
src->sock.fd = src->sockfd;
src->externalfd = TRUE;
}
len = sizeof (rcvsize);
if (src->buffer_size != 0) {
rcvsize = src->buffer_size;
GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
/* set buffer size, Note that on Linux this is typically limited to a
* maximum of around 100K. Also a minimum of 128 bytes is required on
* Linux. */
ret =
setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
len);
if (ret != 0) {
GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
("Could not create a buffer of requested %d bytes, %d: %s (%d)",
rcvsize, ret, g_strerror (errno), errno));
}
}
/* read the value of the receive buffer. Note that on linux this returns 2x the
* value we set because the kernel allocates extra memory for metadata.
* The default on Linux is about 100K (which is about 50K without metadata) */
ret =
getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
if (ret == 0)
GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
else
GST_DEBUG_OBJECT (src, "could not get udp buffer size");
bc_val = 1;
if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
sizeof (bc_val))) < 0) {
GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
("could not configure socket for broadcast %d: %s (%d)", ret,
g_strerror (errno), errno));
}
/* Accept ERRQUEUE to get and flush icmp errors */
err_val = 1;
if ((ret = setsockopt (src->sock.fd, IPPROTO_IP, IP_RECVERR, &err_val,
sizeof (err_val))) < 0) {
GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
("could not configure socket for IP_RECVERR %d: %s (%d)", ret,
g_strerror (errno), errno));
}
if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
GST_DEBUG_OBJECT (src, "joining multicast group %s", src->multi_group);
ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
if (ret < 0)
goto membership;
}
/* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
* follows ss_family on both */
port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
GST_DEBUG_OBJECT (src, "bound, on port %d", port);
if (port != src->port) {
src->port = port;
GST_DEBUG_OBJECT (src, "notifying port %d", port);
g_object_notify (G_OBJECT (src), "port");
}
if ((src->fdset = gst_poll_new (TRUE)) == NULL)
goto no_fdset;
gst_poll_add_fd (src->fdset, &src->sock);
gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
return TRUE;
/* ERRORS */
getaddrinfo_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("getaddrinfo failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
no_socket:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
setsockopt_error:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
bind_error:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
membership:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
getsockname_error:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
no_fdset:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
errno));
return FALSE;
}
}
static gboolean
gst_udpsrc_unlock (GstBaseSrc * bsrc)
{
GstUDPSrc *src;
src = GST_UDPSRC (bsrc);
GST_LOG_OBJECT (src, "Flushing");
gst_poll_set_flushing (src->fdset, TRUE);
return TRUE;
}
static gboolean
gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
{
GstUDPSrc *src;
src = GST_UDPSRC (bsrc);
GST_LOG_OBJECT (src, "No longer flushing");
gst_poll_set_flushing (src->fdset, FALSE);
return TRUE;
}
static gboolean
gst_udpsrc_stop (GstBaseSrc * bsrc)
{
GstUDPSrc *src;
src = GST_UDPSRC (bsrc);
GST_DEBUG ("stopping, closing sockets");
if (src->sock.fd >= 0) {
if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->multi_group);
gst_udp_leave_group (src->sock.fd, &src->myaddr);
}
CLOSE_IF_REQUESTED (src);
}
if (src->fdset) {
gst_poll_free (src->fdset);
src->fdset = NULL;
}
return TRUE;
}
/*** GSTURIHANDLER INTERFACE *************************************************/
static GstURIType
gst_udpsrc_uri_get_type (void)
{
return GST_URI_SRC;
}
static gchar **
gst_udpsrc_uri_get_protocols (void)
{
static gchar *protocols[] = { "udp", NULL };
return protocols;
}
static const gchar *
gst_udpsrc_uri_get_uri (GstURIHandler * handler)
{
GstUDPSrc *src = GST_UDPSRC (handler);
return src->uri;
}
static gboolean
gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
{
gboolean ret;
GstUDPSrc *src = GST_UDPSRC (handler);
ret = gst_udpsrc_set_uri (src, uri);
return ret;
}
static void
gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_udpsrc_uri_get_type;
iface->get_protocols = gst_udpsrc_uri_get_protocols;
iface->get_uri = gst_udpsrc_uri_get_uri;
iface->set_uri = gst_udpsrc_uri_set_uri;
}