gst/udp/gstudpsrc.*: Added property to post a message on timeout.

Original commit message from CVS:
* gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
(gst_udpsrc_create), (gst_udpsrc_set_property),
(gst_udpsrc_get_property), (gst_udpsrc_unlock), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Added property to post a message on timeout.
Updated docs.
When restarting the select, initialize the fdsets again.
Init control sockets so we don't accidentally close a random socket.
API: GstUDPSrc::timeout property
This commit is contained in:
Wim Taymans 2006-09-29 11:09:40 +00:00
parent e8c59d9da3
commit fcd901a5bf
4 changed files with 104 additions and 35 deletions

View file

@ -1,3 +1,15 @@
2006-09-29 Wim Taymans <wim@fluendo.com>
* gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
(gst_udpsrc_create), (gst_udpsrc_set_property),
(gst_udpsrc_get_property), (gst_udpsrc_unlock), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Added property to post a message on timeout.
Updated docs.
When restarting the select, initialize the fdsets again.
Init control sockets so we don't accidentally close a random socket.
API: GstUDPSrc::timeout property
2006-09-29 Wim Taymans <wim@fluendo.com>
* gst/rtsp/gstrtspsrc.c: (gst_rtsp_proto_get_type):

2
common

@ -1 +1 @@
Subproject commit bdd0108b3540ffadeb82cee28b8867a8a6e7ae78
Subproject commit 9991f6fa61ee11475c390dd6675ef7952f079e43

View file

@ -80,7 +80,25 @@
* URIs.
* </para>
* <para>
* Last reviewed on 2006-07-27 (0.10.4)
* If the <link linkend="GstUDPSrc--timeout">timeout property</link> is set to a
* value bigger than 0, udpsrc will generate an element message named
* <classname>&quot;GstUDPSrcTimeout&quot;</classname>:
* if no data was recieved in the given timeout.
* The message's structure contains one field:
* <itemizedlist>
* <listitem>
* <para>
* #guint64
* <classname>&quot;timeout&quot;</classname>: the timeout that expired when
* waiting for data.
* </para>
* </listitem>
* </itemizedlist>
* The message is typically used to detect that no UDP arrives in the receiver
* because it is blocked by a firewall.
* </para>
* <para>
* Last reviewed on 2006-09-29 (0.10.5)
* </para>
* </refsect2>
*/
@ -109,10 +127,10 @@ GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
#define WRITE_SOCKET(src) src->control_sock[1]
#define READ_SOCKET(src) src->control_sock[0]
#define SEND_COMMAND(src, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(src), &c, 1); \
#define SEND_COMMAND(src, command, res) \
G_STMT_START { \
unsigned char c; c = command; \
res = write (WRITE_SOCKET(src), &c, 1); \
} G_STMT_END
#define READ_COMMAND(src, command, res) \
@ -134,10 +152,11 @@ GST_ELEMENT_DETAILS ("UDP packet receiver",
#define UDP_DEFAULT_PORT 4951
#define UDP_DEFAULT_MULTICAST_GROUP "0.0.0.0"
#define UDP_DEFAULT_BUFFER_SIZE 0
#define UDP_DEFAULT_URI "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
#define UDP_DEFAULT_CAPS NULL
#define UDP_DEFAULT_SOCKFD -1
#define UDP_DEFAULT_BUFFER_SIZE 0
#define UDP_DEFAULT_TIMEOUT 0
enum
{
@ -147,7 +166,8 @@ enum
PROP_URI,
PROP_CAPS,
PROP_SOCKFD,
PROP_BUFFER_SIZE
PROP_BUFFER_SIZE,
PROP_TIMEOUT
};
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
@ -228,6 +248,10 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
g_param_spec_int ("buffer-size", "Buffer Size",
"Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
g_param_spec_uint64 ("timeout", "Timeout",
"Post a message after this timeout (in microseconds) (0 = disabled)",
0, G_MAXUINT64, UDP_DEFAULT_TIMEOUT, G_PARAM_READWRITE));
gstbasesrc_class->start = gst_udpsrc_start;
gstbasesrc_class->stop = gst_udpsrc_stop;
@ -248,6 +272,10 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
udpsrc->uri = g_strdup (UDP_DEFAULT_URI);
udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
udpsrc->control_sock[0] = -1;
udpsrc->control_sock[1] = -1;
}
static GstCaps *
@ -277,8 +305,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
#ifdef G_OS_UNIX
gint readsize;
#endif
#ifdef G_OS_WIN32
#elif defined G_OS_WIN32
gulong readsize;
#endif
gint ret;
@ -286,32 +313,43 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
udpsrc = GST_UDPSRC (psrc);
FD_ZERO (&read_fds);
FD_SET (udpsrc->sock, &read_fds);
#ifdef G_OS_UNIX
FD_SET (READ_SOCKET (udpsrc), &read_fds);
#endif
max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
do {
gboolean stop;
struct timeval timeval, *timeout;
FD_ZERO (&read_fds);
FD_SET (udpsrc->sock, &read_fds);
#ifndef G_OS_WIN32
FD_SET (READ_SOCKET (udpsrc), &read_fds);
#endif
max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
try_again = FALSE;
stop = FALSE;
GST_LOG_OBJECT (udpsrc, "doing select");
GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
udpsrc->timeout);
if (udpsrc->timeout > 0) {
timeval.tv_sec = udpsrc->timeout / 1000;
timeval.tv_usec = (udpsrc->timeout % 1000) * 1000;
timeout = &timeval;
} else {
timeout = NULL;
}
#ifdef G_OS_WIN32
if (((max_sock + 1) != READ_SOCKET (udpsrc)) ||
((max_sock + 1) != WRITE_SOCKET (udpsrc))) {
ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL);
ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
} else {
ret = 1;
}
#else
ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL);
ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
#endif
GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
if (ret <= 0) {
if (ret < 0) {
#ifdef G_OS_WIN32
if (WSAGetLastError () != WSAEINTR)
goto select_error;
@ -320,6 +358,13 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
goto select_error;
#endif
try_again = TRUE;
} else if (ret == 0) {
/* timeout, post element message */
gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
gst_message_new_element (GST_OBJECT_CAST (udpsrc),
gst_structure_new ("GstUDPSrcTimeout",
"timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
try_again = TRUE;
} else {
if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds)) {
/* got control message */
@ -353,7 +398,8 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
goto stopped;
} while (try_again);
/* ask how much is available for reading on the socket */
/* ask how much is available for reading on the socket, this is exactly one
* UDP packet. */
if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
goto ioctl_failed;
@ -515,6 +561,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
udpsrc->sock = g_value_get_int (value);
GST_DEBUG ("setting SOCKFD to %d", udpsrc->sock);
break;
case PROP_TIMEOUT:
udpsrc->timeout = g_value_get_uint64 (value);
break;
default:
break;
}
@ -545,6 +594,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SOCKFD:
g_value_set_int (value, udpsrc->sock);
break;
case PROP_TIMEOUT:
g_value_set_uint64 (value, udpsrc->timeout);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -729,11 +781,13 @@ static gboolean
gst_udpsrc_unlock (GstBaseSrc * bsrc)
{
GstUDPSrc *src;
gint res;
src = GST_UDPSRC (bsrc);
GST_DEBUG ("sending stop command");
SEND_COMMAND (src, CONTROL_STOP);
SEND_COMMAND (src, CONTROL_STOP, res);
GST_DEBUG ("sent stop command %d", res);
return TRUE;
}
@ -745,6 +799,8 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
src = GST_UDPSRC (bsrc);
GST_DEBUG ("stopping, closing sockets");
if (src->sock != -1) {
CLOSE_SOCKET (src->sock);
src->sock = -1;

View file

@ -52,19 +52,20 @@ struct _GstUDPSrc {
GstPushSrc parent;
/* properties */
gchar *uri;
int port;
gchar *multi_group;
gint ttl;
gint buffer_size;
int sock;
int control_sock[2];
struct sockaddr_in myaddr;
struct ip_mreq multi_addr;
gchar *uri;
int port;
gchar *multi_group;
gint ttl;
GstCaps *caps;
gint buffer_size;
guint64 timeout;
/* our sockets */
int sock;
int control_sock[2];
struct sockaddr_in myaddr;
struct ip_mreq multi_addr;
};
struct _GstUDPSrcClass {