diff --git a/ChangeLog b/ChangeLog index cdb2dddf69..7929a211ab 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +2008-06-13 Wim Taymans + + * gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init), + (gst_udpsrc_create), (gst_udpsrc_set_property), + (gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop): + * gst/udp/gstudpsrc.h: + Add property to control automatic join/leave of multicast groups. + Add G_LIKELY. + Remove setting caps on buffers explicitly, basesrc does that for us now. + Improve debug info. + Convert some non-fatal error into warnings. + Use g_ntohs for better portability. + Leave multicast groups when stopping. + When using external sockets, use getsockname() on them to fill up the + addr structure before calling methods that use the structure. + Should all fix #536903. + API: GstUDPSrc::auto-multicast property + 2008-06-13 Wim Taymans * gst/udp/gstudpnetutils.c: (gst_udp_is_multicast): diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index 40d91537e9..a934134448 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -170,10 +170,12 @@ GST_ELEMENT_DETAILS ("UDP packet receiver", #define UDP_DEFAULT_SKIP_FIRST_BYTES 0 #define UDP_DEFAULT_CLOSEFD TRUE #define UDP_DEFAULT_SOCK -1 +#define UDP_DEFAULT_AUTO_MULTICAST TRUE enum { PROP_0, + PROP_PORT, PROP_MULTICAST_GROUP, PROP_URI, @@ -183,7 +185,10 @@ enum PROP_TIMEOUT, PROP_SKIP_FIRST_BYTES, PROP_CLOSEFD, - PROP_SOCK + PROP_SOCK, + PROP_AUTO_MULTICAST, + + PROP_LAST }; static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); @@ -283,6 +288,10 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass) g_param_spec_int ("sock", "Socket Handle", "Socket currently in use for UDP reception. (-1 = no socket)", -1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE)); + g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST, + g_param_spec_boolean ("auto-multicast", "Auto Multicast", + "Automatically join/leave multicast groups", + UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE)); gstbasesrc_class->start = gst_udpsrc_start; gstbasesrc_class->stop = gst_udpsrc_stop; @@ -298,7 +307,6 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class) { WSA_STARTUP (udpsrc); - gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE); udpsrc->port = UDP_DEFAULT_PORT; udpsrc->sockfd = UDP_DEFAULT_SOCKFD; udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); @@ -308,9 +316,15 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class) udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES; udpsrc->closefd = UDP_DEFAULT_CLOSEFD; udpsrc->externalfd = (udpsrc->sockfd != -1); - + udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST; udpsrc->sock.fd = UDP_DEFAULT_SOCK; + + /* configure basesrc to be a live source */ + gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE); + /* make basesrc output a segment in time */ gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME); + /* make basesrc set timestamps on outgoing buffers based on the running_time + * when they were captured */ gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE); } @@ -363,12 +377,13 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) gint ret; gboolean try_again; - udpsrc = GST_UDPSRC (psrc); + udpsrc = GST_UDPSRC_CAST (psrc); retry: /* quick check, avoid going in select when we already have data */ readsize = 0; - if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0) + if (G_UNLIKELY ((ret = + IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)) goto ioctl_failed; if (readsize > 0) @@ -388,7 +403,7 @@ retry: ret = gst_poll_wait (udpsrc->fdset, timeout); GST_LOG_OBJECT (udpsrc, "select returned %d", ret); - if (ret < 0) { + if (G_UNLIKELY (ret < 0)) { if (errno == EBUSY) goto stopped; #ifdef G_OS_WIN32 @@ -399,7 +414,7 @@ retry: goto select_error; #endif try_again = TRUE; - } else if (ret == 0) { + } else if (G_UNLIKELY (ret == 0)) { /* timeout, post element message */ gst_element_post_message (GST_ELEMENT_CAST (udpsrc), gst_message_new_element (GST_OBJECT_CAST (udpsrc), @@ -407,20 +422,21 @@ retry: "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL))); try_again = TRUE; } - } while (try_again); + } while (G_UNLIKELY (try_again)); /* ask how much is available for reading on the socket, this should be exactly * one UDP packet. We will check the return value, though, because in some * case it can return 0 and we don't want a 0 sized buffer. */ readsize = 0; - if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0) + if (G_UNLIKELY ((ret = + IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)) goto ioctl_failed; /* if we get here and there is nothing to read from the socket, the select got - * woken up by activity on the socket but it was not a read. We how someone + * woken up by activity on the socket but it was not a read. We know someone * will also do something with the socket so that we don't go into an infinite * loop in the select(). */ - if (!readsize) + if (G_UNLIKELY (!readsize)) goto retry; no_select: @@ -433,7 +449,7 @@ no_select: len = sizeof (struct sockaddr); ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, (struct sockaddr *) &tmpaddr, &len); - if (ret < 0) { + if (G_UNLIKELY (ret < 0)) { #ifdef G_OS_WIN32 /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink * generated a "port unreachable" ICMP response. We ignore that and try @@ -458,7 +474,7 @@ no_select: GST_BUFFER_MALLOCDATA (outbuf) = pktdata; /* patch pktdata and len when stripping off the headers */ - if (udpsrc->skip_first_bytes != 0) { + if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) { if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes)) goto skip_error; @@ -490,9 +506,6 @@ no_select: errno = EAFNOSUPPORT; goto receive_error; } - - gst_buffer_set_caps (GST_BUFFER_CAST (outbuf), udpsrc->caps); - GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize); *buf = GST_BUFFER_CAST (outbuf); @@ -647,6 +660,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_CLOSEFD: udpsrc->closefd = g_value_get_boolean (value); break; + case PROP_AUTO_MULTICAST: + udpsrc->auto_multicast = g_value_get_boolean (value); + break; default: break; } @@ -689,6 +705,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_SOCK: g_value_set_int (value, udpsrc->sock.fd); break; + case PROP_AUTO_MULTICAST: + g_value_set_boolean (value, udpsrc->auto_multicast); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -701,20 +720,22 @@ gst_udpsrc_start (GstBaseSrc * bsrc) { guint bc_val; gint reuse; - struct sockaddr_storage my_addr; - guint len; int port; GstUDPSrc *src; gint ret; int rcvsize; + guint len; src = GST_UDPSRC (bsrc); if (src->sockfd == -1) { /* need to allocate a socket */ + GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->multi_group, + src->port); if ((ret = gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0) goto getaddrinfo_error; + if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0) goto no_socket; @@ -732,22 +753,18 @@ gst_udpsrc_start (GstBaseSrc * bsrc) sizeof (src->myaddr))) < 0) goto bind_error; } else { - /* we use the configured socket */ + GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd); + /* we use the configured socket, try to get some info about it */ + len = sizeof (src->myaddr); + if ((ret = + getsockname (src->sockfd, (struct sockaddr *) &src->myaddr, + &len)) < 0) + goto getsockname_error; + src->sock.fd = src->sockfd; src->externalfd = TRUE; } - if (gst_udp_is_multicast (&src->myaddr)) { - ret = gst_udp_join_group (src->sock.fd, &src->myaddr); - if (ret < 0) - goto membership; - } - - len = sizeof (my_addr); - if ((ret = - getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0) - goto getsockname_error; - len = sizeof (rcvsize); if (src->buffer_size != 0) { rcvsize = src->buffer_size; @@ -759,8 +776,11 @@ gst_udpsrc_start (GstBaseSrc * bsrc) ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, len); - if (ret != 0) - goto udpbuffer_error; + if (ret != 0) { + GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL), + ("Could not create a buffer of requested %d bytes, %d: %s (%d)", + rcvsize, ret, g_strerror (errno), errno)); + } } /* read the value of the receive buffer. Note that on linux this returns 2x the @@ -775,21 +795,29 @@ gst_udpsrc_start (GstBaseSrc * bsrc) bc_val = 1; if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val, - sizeof (bc_val))) < 0) - goto no_broadcast; + sizeof (bc_val))) < 0) { + GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL), + ("could not configure socket for broadcast %d: %s (%d)", ret, + g_strerror (errno), errno)); + } + + if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) { + GST_DEBUG_OBJECT (src, "joining multicast group %s", src->multi_group); + ret = gst_udp_join_group (src->sock.fd, &src->myaddr); + if (ret < 0) + goto membership; + } /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port * follows ss_family on both */ - port = ntohs (((struct sockaddr_in *) &my_addr)->sin_port); + port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port); GST_DEBUG_OBJECT (src, "bound, on port %d", port); if (port != src->port) { src->port = port; - GST_DEBUG_OBJECT (src, "notifying %d", port); + GST_DEBUG_OBJECT (src, "notifying port %d", port); g_object_notify (G_OBJECT (src), "port"); } - ((struct sockaddr_in *) &src->myaddr)->sin_port = htons (src->port + 1); - if ((src->fdset = gst_poll_new (TRUE)) == NULL) goto no_fdset; @@ -839,22 +867,6 @@ getsockname_error: ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno)); return FALSE; } -udpbuffer_error: - { - CLOSE_IF_REQUESTED (src); - GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), - ("Could not create a buffer of the size requested, %d: %s (%d)", ret, - g_strerror (errno), errno)); - return FALSE; - } -no_broadcast: - { - CLOSE_IF_REQUESTED (src); - GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), - ("could not configure socket for broadcast %d: %s (%d)", ret, - g_strerror (errno), errno)); - return FALSE; - } no_fdset: { CLOSE_IF_REQUESTED (src); @@ -901,6 +913,10 @@ gst_udpsrc_stop (GstBaseSrc * bsrc) GST_DEBUG ("stopping, closing sockets"); if (src->sock.fd >= 0) { + if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) { + GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->multi_group); + gst_udp_leave_group (src->sock.fd, &src->myaddr); + } CLOSE_IF_REQUESTED (src); } diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h index 597a6a708f..cdf7b35e39 100644 --- a/gst/udp/gstudpsrc.h +++ b/gst/udp/gstudpsrc.h @@ -44,6 +44,7 @@ G_BEGIN_DECLS (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSRC)) #define GST_IS_UDPSRC_CLASS(klass) \ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSRC)) +#define GST_UDPSRC_CAST(obj) ((GstUDPSrc *)(obj)) typedef struct _GstUDPSrc GstUDPSrc; typedef struct _GstUDPSrcClass GstUDPSrcClass; @@ -62,6 +63,7 @@ struct _GstUDPSrc { gint skip_first_bytes; int sockfd; gboolean closefd; + gboolean auto_multicast; /* our sockets */ GstPollFD sock;