gst/udp/gstudpsrc.*: Add property to control automatic join/leave of multicast groups.

Original commit message from CVS:
* gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
(gst_udpsrc_create), (gst_udpsrc_set_property),
(gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Add property to control automatic join/leave of multicast groups.
Add G_LIKELY.
Remove setting caps on buffers explicitly, basesrc does that for us now.
Improve debug info.
Convert some non-fatal error into warnings.
Use g_ntohs for better portability.
Leave multicast groups when stopping.
When using external sockets, use getsockname() on them to fill up the
addr structure before calling methods that use the structure.
Should all fix #536903.
API: GstUDPSrc::auto-multicast property
This commit is contained in:
Wim Taymans 2008-06-13 11:54:05 +00:00
parent 5b751d0290
commit ccddfc5da7
3 changed files with 90 additions and 54 deletions

View file

@ -1,3 +1,21 @@
2008-06-13 Wim Taymans <wim.taymans@collabora.co.uk>
* gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
(gst_udpsrc_create), (gst_udpsrc_set_property),
(gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Add property to control automatic join/leave of multicast groups.
Add G_LIKELY.
Remove setting caps on buffers explicitly, basesrc does that for us now.
Improve debug info.
Convert some non-fatal error into warnings.
Use g_ntohs for better portability.
Leave multicast groups when stopping.
When using external sockets, use getsockname() on them to fill up the
addr structure before calling methods that use the structure.
Should all fix #536903.
API: GstUDPSrc::auto-multicast property
2008-06-13 Wim Taymans <wim.taymans@collabora.co.uk> 2008-06-13 Wim Taymans <wim.taymans@collabora.co.uk>
* gst/udp/gstudpnetutils.c: (gst_udp_is_multicast): * gst/udp/gstudpnetutils.c: (gst_udp_is_multicast):

View file

@ -170,10 +170,12 @@ GST_ELEMENT_DETAILS ("UDP packet receiver",
#define UDP_DEFAULT_SKIP_FIRST_BYTES 0 #define UDP_DEFAULT_SKIP_FIRST_BYTES 0
#define UDP_DEFAULT_CLOSEFD TRUE #define UDP_DEFAULT_CLOSEFD TRUE
#define UDP_DEFAULT_SOCK -1 #define UDP_DEFAULT_SOCK -1
#define UDP_DEFAULT_AUTO_MULTICAST TRUE
enum enum
{ {
PROP_0, PROP_0,
PROP_PORT, PROP_PORT,
PROP_MULTICAST_GROUP, PROP_MULTICAST_GROUP,
PROP_URI, PROP_URI,
@ -183,7 +185,10 @@ enum
PROP_TIMEOUT, PROP_TIMEOUT,
PROP_SKIP_FIRST_BYTES, PROP_SKIP_FIRST_BYTES,
PROP_CLOSEFD, PROP_CLOSEFD,
PROP_SOCK PROP_SOCK,
PROP_AUTO_MULTICAST,
PROP_LAST
}; };
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
@ -283,6 +288,10 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
g_param_spec_int ("sock", "Socket Handle", g_param_spec_int ("sock", "Socket Handle",
"Socket currently in use for UDP reception. (-1 = no socket)", "Socket currently in use for UDP reception. (-1 = no socket)",
-1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE)); -1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
g_param_spec_boolean ("auto-multicast", "Auto Multicast",
"Automatically join/leave multicast groups",
UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
gstbasesrc_class->start = gst_udpsrc_start; gstbasesrc_class->start = gst_udpsrc_start;
gstbasesrc_class->stop = gst_udpsrc_stop; gstbasesrc_class->stop = gst_udpsrc_stop;
@ -298,7 +307,6 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
{ {
WSA_STARTUP (udpsrc); WSA_STARTUP (udpsrc);
gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
udpsrc->port = UDP_DEFAULT_PORT; udpsrc->port = UDP_DEFAULT_PORT;
udpsrc->sockfd = UDP_DEFAULT_SOCKFD; udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
@ -308,9 +316,15 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES; udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
udpsrc->closefd = UDP_DEFAULT_CLOSEFD; udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
udpsrc->externalfd = (udpsrc->sockfd != -1); udpsrc->externalfd = (udpsrc->sockfd != -1);
udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
udpsrc->sock.fd = UDP_DEFAULT_SOCK; udpsrc->sock.fd = UDP_DEFAULT_SOCK;
/* configure basesrc to be a live source */
gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
/* make basesrc output a segment in time */
gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME); gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
/* make basesrc set timestamps on outgoing buffers based on the running_time
* when they were captured */
gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE); gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
} }
@ -363,12 +377,13 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
gint ret; gint ret;
gboolean try_again; gboolean try_again;
udpsrc = GST_UDPSRC (psrc); udpsrc = GST_UDPSRC_CAST (psrc);
retry: retry:
/* quick check, avoid going in select when we already have data */ /* quick check, avoid going in select when we already have data */
readsize = 0; readsize = 0;
if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0) if (G_UNLIKELY ((ret =
IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed; goto ioctl_failed;
if (readsize > 0) if (readsize > 0)
@ -388,7 +403,7 @@ retry:
ret = gst_poll_wait (udpsrc->fdset, timeout); ret = gst_poll_wait (udpsrc->fdset, timeout);
GST_LOG_OBJECT (udpsrc, "select returned %d", ret); GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
if (ret < 0) { if (G_UNLIKELY (ret < 0)) {
if (errno == EBUSY) if (errno == EBUSY)
goto stopped; goto stopped;
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
@ -399,7 +414,7 @@ retry:
goto select_error; goto select_error;
#endif #endif
try_again = TRUE; try_again = TRUE;
} else if (ret == 0) { } else if (G_UNLIKELY (ret == 0)) {
/* timeout, post element message */ /* timeout, post element message */
gst_element_post_message (GST_ELEMENT_CAST (udpsrc), gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
gst_message_new_element (GST_OBJECT_CAST (udpsrc), gst_message_new_element (GST_OBJECT_CAST (udpsrc),
@ -407,20 +422,21 @@ retry:
"timeout", G_TYPE_UINT64, udpsrc->timeout, NULL))); "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
try_again = TRUE; try_again = TRUE;
} }
} while (try_again); } while (G_UNLIKELY (try_again));
/* ask how much is available for reading on the socket, this should be exactly /* ask how much is available for reading on the socket, this should be exactly
* one UDP packet. We will check the return value, though, because in some * one UDP packet. We will check the return value, though, because in some
* case it can return 0 and we don't want a 0 sized buffer. */ * case it can return 0 and we don't want a 0 sized buffer. */
readsize = 0; readsize = 0;
if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0) if (G_UNLIKELY ((ret =
IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed; goto ioctl_failed;
/* if we get here and there is nothing to read from the socket, the select got /* if we get here and there is nothing to read from the socket, the select got
* woken up by activity on the socket but it was not a read. We how someone * woken up by activity on the socket but it was not a read. We know someone
* will also do something with the socket so that we don't go into an infinite * will also do something with the socket so that we don't go into an infinite
* loop in the select(). */ * loop in the select(). */
if (!readsize) if (G_UNLIKELY (!readsize))
goto retry; goto retry;
no_select: no_select:
@ -433,7 +449,7 @@ no_select:
len = sizeof (struct sockaddr); len = sizeof (struct sockaddr);
ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize,
0, (struct sockaddr *) &tmpaddr, &len); 0, (struct sockaddr *) &tmpaddr, &len);
if (ret < 0) { if (G_UNLIKELY (ret < 0)) {
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
/* WSAECONNRESET for a UDP socket means that a packet sent with udpsink /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
* generated a "port unreachable" ICMP response. We ignore that and try * generated a "port unreachable" ICMP response. We ignore that and try
@ -458,7 +474,7 @@ no_select:
GST_BUFFER_MALLOCDATA (outbuf) = pktdata; GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
/* patch pktdata and len when stripping off the headers */ /* patch pktdata and len when stripping off the headers */
if (udpsrc->skip_first_bytes != 0) { if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes)) if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
goto skip_error; goto skip_error;
@ -490,9 +506,6 @@ no_select:
errno = EAFNOSUPPORT; errno = EAFNOSUPPORT;
goto receive_error; goto receive_error;
} }
gst_buffer_set_caps (GST_BUFFER_CAST (outbuf), udpsrc->caps);
GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize); GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
*buf = GST_BUFFER_CAST (outbuf); *buf = GST_BUFFER_CAST (outbuf);
@ -647,6 +660,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_CLOSEFD: case PROP_CLOSEFD:
udpsrc->closefd = g_value_get_boolean (value); udpsrc->closefd = g_value_get_boolean (value);
break; break;
case PROP_AUTO_MULTICAST:
udpsrc->auto_multicast = g_value_get_boolean (value);
break;
default: default:
break; break;
} }
@ -689,6 +705,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SOCK: case PROP_SOCK:
g_value_set_int (value, udpsrc->sock.fd); g_value_set_int (value, udpsrc->sock.fd);
break; break;
case PROP_AUTO_MULTICAST:
g_value_set_boolean (value, udpsrc->auto_multicast);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -701,20 +720,22 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
{ {
guint bc_val; guint bc_val;
gint reuse; gint reuse;
struct sockaddr_storage my_addr;
guint len;
int port; int port;
GstUDPSrc *src; GstUDPSrc *src;
gint ret; gint ret;
int rcvsize; int rcvsize;
guint len;
src = GST_UDPSRC (bsrc); src = GST_UDPSRC (bsrc);
if (src->sockfd == -1) { if (src->sockfd == -1) {
/* need to allocate a socket */ /* need to allocate a socket */
GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->multi_group,
src->port);
if ((ret = if ((ret =
gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0) gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0)
goto getaddrinfo_error; goto getaddrinfo_error;
if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0) if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
goto no_socket; goto no_socket;
@ -732,22 +753,18 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
sizeof (src->myaddr))) < 0) sizeof (src->myaddr))) < 0)
goto bind_error; goto bind_error;
} else { } else {
/* we use the configured socket */ GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
/* we use the configured socket, try to get some info about it */
len = sizeof (src->myaddr);
if ((ret =
getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
&len)) < 0)
goto getsockname_error;
src->sock.fd = src->sockfd; src->sock.fd = src->sockfd;
src->externalfd = TRUE; src->externalfd = TRUE;
} }
if (gst_udp_is_multicast (&src->myaddr)) {
ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
if (ret < 0)
goto membership;
}
len = sizeof (my_addr);
if ((ret =
getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0)
goto getsockname_error;
len = sizeof (rcvsize); len = sizeof (rcvsize);
if (src->buffer_size != 0) { if (src->buffer_size != 0) {
rcvsize = src->buffer_size; rcvsize = src->buffer_size;
@ -759,8 +776,11 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
ret = ret =
setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
len); len);
if (ret != 0) if (ret != 0) {
goto udpbuffer_error; GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
("Could not create a buffer of requested %d bytes, %d: %s (%d)",
rcvsize, ret, g_strerror (errno), errno));
}
} }
/* read the value of the receive buffer. Note that on linux this returns 2x the /* read the value of the receive buffer. Note that on linux this returns 2x the
@ -775,21 +795,29 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
bc_val = 1; bc_val = 1;
if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val, if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
sizeof (bc_val))) < 0) sizeof (bc_val))) < 0) {
goto no_broadcast; GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
("could not configure socket for broadcast %d: %s (%d)", ret,
g_strerror (errno), errno));
}
if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
GST_DEBUG_OBJECT (src, "joining multicast group %s", src->multi_group);
ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
if (ret < 0)
goto membership;
}
/* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
* follows ss_family on both */ * follows ss_family on both */
port = ntohs (((struct sockaddr_in *) &my_addr)->sin_port); port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
GST_DEBUG_OBJECT (src, "bound, on port %d", port); GST_DEBUG_OBJECT (src, "bound, on port %d", port);
if (port != src->port) { if (port != src->port) {
src->port = port; src->port = port;
GST_DEBUG_OBJECT (src, "notifying %d", port); GST_DEBUG_OBJECT (src, "notifying port %d", port);
g_object_notify (G_OBJECT (src), "port"); g_object_notify (G_OBJECT (src), "port");
} }
((struct sockaddr_in *) &src->myaddr)->sin_port = htons (src->port + 1);
if ((src->fdset = gst_poll_new (TRUE)) == NULL) if ((src->fdset = gst_poll_new (TRUE)) == NULL)
goto no_fdset; goto no_fdset;
@ -839,22 +867,6 @@ getsockname_error:
("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno)); ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE; return FALSE;
} }
udpbuffer_error:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("Could not create a buffer of the size requested, %d: %s (%d)", ret,
g_strerror (errno), errno));
return FALSE;
}
no_broadcast:
{
CLOSE_IF_REQUESTED (src);
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("could not configure socket for broadcast %d: %s (%d)", ret,
g_strerror (errno), errno));
return FALSE;
}
no_fdset: no_fdset:
{ {
CLOSE_IF_REQUESTED (src); CLOSE_IF_REQUESTED (src);
@ -901,6 +913,10 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
GST_DEBUG ("stopping, closing sockets"); GST_DEBUG ("stopping, closing sockets");
if (src->sock.fd >= 0) { if (src->sock.fd >= 0) {
if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->multi_group);
gst_udp_leave_group (src->sock.fd, &src->myaddr);
}
CLOSE_IF_REQUESTED (src); CLOSE_IF_REQUESTED (src);
} }

View file

@ -44,6 +44,7 @@ G_BEGIN_DECLS
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSRC)) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSRC))
#define GST_IS_UDPSRC_CLASS(klass) \ #define GST_IS_UDPSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSRC)) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSRC))
#define GST_UDPSRC_CAST(obj) ((GstUDPSrc *)(obj))
typedef struct _GstUDPSrc GstUDPSrc; typedef struct _GstUDPSrc GstUDPSrc;
typedef struct _GstUDPSrcClass GstUDPSrcClass; typedef struct _GstUDPSrcClass GstUDPSrcClass;
@ -62,6 +63,7 @@ struct _GstUDPSrc {
gint skip_first_bytes; gint skip_first_bytes;
int sockfd; int sockfd;
gboolean closefd; gboolean closefd;
gboolean auto_multicast;
/* our sockets */ /* our sockets */
GstPollFD sock; GstPollFD sock;