diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index f1ac2231e7..c6e41d4cb7 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -503,16 +503,14 @@ gst_udpsrc_ensure_mem (GstUDPSrc * src) static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) { - GstFlowReturn ret; GstUDPSrc *udpsrc; GstBuffer *outbuf = NULL; - GstMapInfo info; GSocketAddress *saddr = NULL; - gsize offset; - gssize readsize; - gssize res; + gint flags = G_SOCKET_MSG_NONE; gboolean try_again; GError *err = NULL; + gssize res; + gsize offset; udpsrc = GST_UDPSRC_CAST (psrc); @@ -520,10 +518,6 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) goto memory_alloc_error; retry: - /* quick check, avoid going in select when we already have data */ - readsize = g_socket_get_available_bytes (udpsrc->used_socket); - if (readsize > 0) - goto no_select; do { gint64 timeout; @@ -557,63 +551,20 @@ retry: } } while (G_UNLIKELY (try_again)); - /* ask how much is available for reading on the socket, this should be exactly - * one UDP packet. We will check the return value, though, because in some - * case it can return 0 and we don't want a 0 sized buffer. */ - readsize = g_socket_get_available_bytes (udpsrc->used_socket); - if (G_UNLIKELY (readsize < 0)) - goto get_available_error; - - /* If we get here and the readsize is zero, then either select was woken up - * by activity that is not a read, or a poll error occurred, or a UDP packet - * was received that has no data. Since we cannot identify which case it is, - * we handle all of them. This could possibly lead to a UDP packet getting - * lost, but since UDP is not reliable, we can accept this. */ - if (G_UNLIKELY (!readsize)) { - /* try to read a packet (and it will be ignored), - * in case a packet with no data arrived */ - res = - g_socket_receive_from (udpsrc->used_socket, NULL, NULL, - 0, udpsrc->cancellable, &err); - if (G_UNLIKELY (res < 0)) - goto receive_error; - - /* poll again */ - goto retry; + if (saddr != NULL) { + g_object_unref (saddr); + saddr = NULL; } -no_select: - GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize); - - /* sanity check value from _get_available_bytes(), which might be as - * large as the kernel-side buffer on some operating systems */ - if (g_socket_get_family (udpsrc->used_socket) == G_SOCKET_FAMILY_IPV4) - readsize = MIN (MAX_IPV4_UDP_PACKET_SIZE, readsize); - - ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC_CAST (udpsrc), - -1, readsize, &outbuf); - if (ret != GST_FLOW_OK) - goto alloc_failed; - - gst_buffer_map (outbuf, &info, GST_MAP_WRITE); - offset = 0; - - if (saddr) - g_object_unref (saddr); - saddr = NULL; - res = - g_socket_receive_from (udpsrc->used_socket, &saddr, (gchar *) info.data, - info.size, udpsrc->cancellable, &err); + g_socket_receive_message (udpsrc->used_socket, &saddr, udpsrc->vec, 2, + NULL, NULL, &flags, udpsrc->cancellable, &err); if (G_UNLIKELY (res < 0)) { /* EHOSTUNREACH for a UDP socket means that a packet sent with udpsink * generated a "port unreachable" ICMP response. We ignore that and try * again. */ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE)) { - gst_buffer_unmap (outbuf, &info); - gst_buffer_unref (outbuf); - outbuf = NULL; g_clear_error (&err); goto retry; } @@ -624,30 +575,46 @@ no_select: if (res > udpsrc->max_size) udpsrc->max_size = res; - /* patch offset and size when stripping off the headers */ - if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) { - if (G_UNLIKELY (readsize < udpsrc->skip_first_bytes)) - goto skip_error; + outbuf = gst_buffer_new (); - offset += udpsrc->skip_first_bytes; - res -= udpsrc->skip_first_bytes; + /* append first memory chunk to buffer */ + gst_buffer_append_memory (outbuf, udpsrc->mem); + + /* if the packet didn't fit into the first chunk, add second one as well */ + if (res > udpsrc->map.size) { + gst_buffer_append_memory (outbuf, udpsrc->mem_max); + gst_memory_unmap (udpsrc->mem_max, &udpsrc->map_max); + udpsrc->vec[1].buffer = NULL; + udpsrc->vec[1].size = 0; + udpsrc->mem_max = NULL; } - gst_buffer_unmap (outbuf, &info); - gst_buffer_resize (outbuf, offset, res); + /* make sure we allocate a new chunk next time (we do this only here because + * we look at map.size to see if the second memory chunk is needed above) */ + gst_memory_unmap (udpsrc->mem, &udpsrc->map); + udpsrc->vec[0].buffer = NULL; + udpsrc->vec[0].size = 0; + udpsrc->mem = NULL; + + offset = udpsrc->skip_first_bytes; + + if (G_UNLIKELY (offset > 0 && res < offset)) + goto skip_error; + + gst_buffer_resize (outbuf, offset, res - offset); /* use buffer metadata so receivers can also track the address */ if (saddr) { gst_buffer_add_net_address_meta (outbuf, saddr); g_object_unref (saddr); + saddr = NULL; } - saddr = NULL; - GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize); + GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res); *buf = GST_BUFFER_CAST (outbuf); - return ret; + return GST_FLOW_OK; /* ERRORS */ memory_alloc_error: @@ -669,24 +636,8 @@ stopped: g_clear_error (&err); return GST_FLOW_FLUSHING; } -get_available_error: - { - GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), - ("get available bytes failed")); - return GST_FLOW_ERROR; - } -alloc_failed: - { - GST_DEBUG ("Allocation failed"); - return ret; - } receive_error: { - if (outbuf != NULL) { - gst_buffer_unmap (outbuf, &info); - gst_buffer_unref (outbuf); - } - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); @@ -700,7 +651,6 @@ receive_error: } skip_error: { - gst_buffer_unmap (outbuf, &info); gst_buffer_unref (outbuf); GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),