From 943f0445a965c319dcf31cc1e1839212aa479de4 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 12 May 2005 10:45:25 +0000 Subject: [PATCH] gst/: Make UDP and TCP elements use PushSrc. Original commit message from CVS: * gst/rtsp/README: * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type), (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init), (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps), (gst_tcpclientsrc_stop), (gst_tcpclientsrc_eos), (gst_tcpclientsrc_create), (gst_tcpclientsrc_start): * gst/tcp/gsttcpclientsrc.h: * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type), (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init), (gst_tcpserversrc_init), (gst_tcpserversrc_create), (gst_tcpserversrc_start), (gst_tcpserversrc_stop): * gst/tcp/gsttcpserversrc.h: * gst/tcp/gsttcpsrc.c: (gst_tcpsrc_get_type), (gst_tcpsrc_base_init), (gst_tcpsrc_class_init), (gst_tcpsrc_init), (gst_tcpsrc_create), (gst_tcpsrc_start), (gst_tcpsrc_stop): * gst/tcp/gsttcpsrc.h: * gst/udp/gstudpsink.c: (gst_udpsink_base_init), (gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render), (gst_udpsink_set_property), (gst_udpsink_get_property), (gst_udpsink_change_state): * gst/udp/gstudpsink.h: * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type), (gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_init), (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start), (gst_udpsrc_stop): * gst/udp/gstudpsrc.h: Make UDP and TCP elements use PushSrc. --- ChangeLog | 31 +++++++ gst/rtsp/README | 2 +- gst/udp/gstudpsink.c | 41 ++++----- gst/udp/gstudpsink.h | 1 - gst/udp/gstudpsrc.c | 199 +++++++++++++++---------------------------- gst/udp/gstudpsrc.h | 19 ++--- 6 files changed, 123 insertions(+), 170 deletions(-) diff --git a/ChangeLog b/ChangeLog index 9589d81736..0be548b256 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,34 @@ +2005-05-12 Wim Taymans + + * gst/rtsp/README: + * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type), + (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init), + (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps), + (gst_tcpclientsrc_stop), (gst_tcpclientsrc_eos), + (gst_tcpclientsrc_create), (gst_tcpclientsrc_start): + * gst/tcp/gsttcpclientsrc.h: + * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type), + (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init), + (gst_tcpserversrc_init), (gst_tcpserversrc_create), + (gst_tcpserversrc_start), (gst_tcpserversrc_stop): + * gst/tcp/gsttcpserversrc.h: + * gst/tcp/gsttcpsrc.c: (gst_tcpsrc_get_type), + (gst_tcpsrc_base_init), (gst_tcpsrc_class_init), (gst_tcpsrc_init), + (gst_tcpsrc_create), (gst_tcpsrc_start), (gst_tcpsrc_stop): + * gst/tcp/gsttcpsrc.h: + * gst/udp/gstudpsink.c: (gst_udpsink_base_init), + (gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render), + (gst_udpsink_set_property), (gst_udpsink_get_property), + (gst_udpsink_change_state): + * gst/udp/gstudpsink.h: + * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type), + (gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_init), + (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start), + (gst_udpsrc_stop): + * gst/udp/gstudpsrc.h: + Make UDP and TCP elements use PushSrc. + + 2005-05-11 Tim-Philipp Müller * ext/mad/gstmad.c: (gst_mad_init), (gst_mad_src_query), diff --git a/gst/rtsp/README b/gst/rtsp/README index 62793a65e0..285f4f436b 100644 --- a/gst/rtsp/README +++ b/gst/rtsp/README @@ -137,7 +137,7 @@ An RTSP session is created as follows: +---------------------------------------------+ | +------------+ | | | _loop() | +--------+ | - | | ----- rtpdec -------------------- + | | ----- rtpses -------------------- | | | | | | | | | | | +------------+ | | | ----- RTCP ---- udpsink | | diff --git a/gst/udp/gstudpsink.c b/gst/udp/gstudpsink.c index 93fb654bdd..0d57cd661a 100644 --- a/gst/udp/gstudpsink.c +++ b/gst/udp/gstudpsink.c @@ -50,8 +50,7 @@ enum ARG_0, ARG_HOST, ARG_PORT, - ARG_MTU - /* FILL ME */ + /* FILL ME */ }; static void gst_udpsink_base_init (gpointer g_class); @@ -146,7 +145,6 @@ gst_udpsink_init (GstUDPSink * udpsink) { udpsink->host = g_strdup (UDP_DEFAULT_HOST); udpsink->port = UDP_DEFAULT_PORT; - udpsink->mtu = 1024; } static void @@ -161,30 +159,31 @@ static GstFlowReturn gst_udpsink_render (GstBaseSink * sink, GstBuffer * buffer) { GstUDPSink *udpsink; - gint tosend; + gint ret, size; guint8 *data; udpsink = GST_UDPSINK (sink); - tosend = GST_BUFFER_SIZE (buffer); + size = GST_BUFFER_SIZE (buffer); data = GST_BUFFER_DATA (buffer); - /* send in chunks of MTU */ - while (tosend > 0) { - gint psize; + while (TRUE) { + ret = sendto (udpsink->sock, data, size, 0, + (struct sockaddr *) &udpsink->theiraddr, sizeof (udpsink->theiraddr)); - psize = MIN (udpsink->mtu, tosend); - if (sendto (udpsink->sock, data, psize, 0, - (struct sockaddr *) &udpsink->theiraddr, - sizeof (udpsink->theiraddr)) == -1) { - perror ("sending"); - } - - data += psize; - tosend -= psize; + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN) + goto send_error; + } else + break; } - return GST_FLOW_OK; + +send_error: + { + GST_DEBUG ("got send error"); + return GST_FLOW_ERROR; + } } static void @@ -207,9 +206,6 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value, case ARG_PORT: udpsink->port = g_value_get_int (value); break; - case ARG_MTU: - udpsink->mtu = g_value_get_int (value); - break; default: break; } @@ -230,9 +226,6 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value, case ARG_PORT: g_value_set_int (value, udpsink->port); break; - case ARG_MTU: - g_value_set_int (value, udpsink->mtu); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/gst/udp/gstudpsink.h b/gst/udp/gstudpsink.h index 0f65a2545d..e5480c3463 100644 --- a/gst/udp/gstudpsink.h +++ b/gst/udp/gstudpsink.h @@ -59,7 +59,6 @@ struct _GstUDPSink { gint port; gchar *host; - guint mtu; }; struct _GstUDPSinkClass { diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index 120d0c5508..76c4b73a55 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -63,9 +63,9 @@ static void gst_udpsrc_init (GstUDPSrc * udpsrc); static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); -static void gst_udpsrc_loop (GstPad * pad); -static GstElementStateReturn gst_udpsrc_change_state (GstElement * element); -static gboolean gst_udpsrc_activate (GstPad * pad, GstActivateMode mode); +static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf); +static gboolean gst_udpsrc_start (GstBaseSrc * bsrc); +static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc); static void gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -101,7 +101,7 @@ gst_udpsrc_get_type (void) }; udpsrc_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstUDPSrc", &udpsrc_info, 0); + g_type_register_static (GST_TYPE_PUSHSRC, "GstUDPSrc", &udpsrc_info, 0); g_type_add_interface_static (udpsrc_type, GST_TYPE_URI_HANDLER, &urihandler_info); @@ -125,11 +125,15 @@ gst_udpsrc_class_init (GstUDPSrc * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstBaseSrcClass *gstbasesrc_class; + GstPushSrcClass *gstpushsrc_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpushsrc_class = (GstPushSrcClass *) klass; - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + parent_class = g_type_class_ref (GST_TYPE_PUSHSRC); gobject_class->set_property = gst_udpsrc_set_property; gobject_class->get_property = gst_udpsrc_get_property; @@ -147,27 +151,22 @@ gst_udpsrc_class_init (GstUDPSrc * klass) "URI in the form of udp://hostname:port", UDP_DEFAULT_URI, G_PARAM_READWRITE)); - gstelement_class->change_state = gst_udpsrc_change_state; + gstbasesrc_class->start = gst_udpsrc_start; + gstbasesrc_class->stop = gst_udpsrc_stop; + gstpushsrc_class->create = gst_udpsrc_create; } static void gst_udpsrc_init (GstUDPSrc * udpsrc) { - /* create the src and src pads */ - udpsrc->srcpad = gst_pad_new_from_template - (gst_static_pad_template_get (&src_template), "src"); - gst_pad_set_activate_function (udpsrc->srcpad, gst_udpsrc_activate); - gst_pad_set_loop_function (udpsrc->srcpad, gst_udpsrc_loop); - gst_element_add_pad (GST_ELEMENT (udpsrc), udpsrc->srcpad); - udpsrc->port = UDP_DEFAULT_PORT; udpsrc->sock = -1; udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); udpsrc->uri = g_strdup (UDP_DEFAULT_URI); } -static void -gst_udpsrc_loop (GstPad * pad) +static GstFlowReturn +gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) { GstUDPSrc *udpsrc; GstBuffer *outbuf; @@ -176,55 +175,51 @@ gst_udpsrc_loop (GstPad * pad) gint numbytes; fd_set read_fds; guint max_sock; + gchar *pktdata; + gint pktsize; - udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); + udpsrc = GST_UDPSRC (psrc); FD_ZERO (&read_fds); FD_SET (udpsrc->sock, &read_fds); max_sock = udpsrc->sock; - GST_STREAM_LOCK (pad); - /* FIXME, add another socket to unblock */ if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0) goto select_error; - outbuf = gst_buffer_new (); - GST_BUFFER_DATA (outbuf) = g_malloc (24000); - GST_BUFFER_SIZE (outbuf) = 24000; + pktdata = g_malloc (24000); + pktsize = 24000; len = sizeof (struct sockaddr); - if ((numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf), - GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr, - &len)) == -1) - goto receive_error; + while (TRUE) { + numbytes = recvfrom (udpsrc->sock, pktdata, pktsize, + 0, (struct sockaddr *) &tmpaddr, &len); + if (numbytes < 0) { + if (errno != EAGAIN && errno != EINTR) + goto receive_error; + } else + break; + } + outbuf = gst_buffer_new (); + GST_BUFFER_DATA (outbuf) = pktdata; GST_BUFFER_SIZE (outbuf) = numbytes; - if (gst_pad_push (udpsrc->srcpad, outbuf) != GST_FLOW_OK) - goto need_pause; - GST_STREAM_UNLOCK (pad); + *buf = outbuf; - return; + return GST_FLOW_OK; select_error: { - GST_STREAM_UNLOCK (pad); GST_DEBUG ("got select error"); - return; + return GST_FLOW_ERROR; } receive_error: { - GST_STREAM_UNLOCK (pad); gst_buffer_unref (outbuf); GST_DEBUG ("got receive error"); - return; - } -need_pause: - { - gst_task_pause (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - return; + return GST_FLOW_ERROR; } } @@ -315,12 +310,15 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, /* create a socket for sending to remote machine */ static gboolean -gst_udpsrc_init_receive (GstUDPSrc * src) +gst_udpsrc_start (GstBaseSrc * bsrc) { guint bc_val; gint reuse = 1; struct sockaddr_in my_addr; int len, port; + GstUDPSrc *src; + + src = GST_UDPSRC (bsrc); memset (&src->myaddr, 0, sizeof (src->myaddr)); src->myaddr.sin_family = AF_INET; /* host byte order */ @@ -328,15 +326,15 @@ gst_udpsrc_init_receive (GstUDPSrc * src) src->myaddr.sin_addr.s_addr = INADDR_ANY; if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0) - goto error; + goto no_socket; if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse)) < 0) - goto error; + goto setsockopt_error; if (bind (src->sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) < 0) - goto error; + goto bind_error; if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) { if (src->multi_addr.imr_multiaddr.s_addr) { @@ -347,7 +345,9 @@ gst_udpsrc_init_receive (GstUDPSrc * src) } len = sizeof (my_addr); - getsockname (src->sock, (struct sockaddr *) &my_addr, &len); + if (getsockname (src->sock, (struct sockaddr *) &my_addr, &len) < 0) + goto getsockname_error; + port = ntohs (my_addr.sin_port); if (port != src->port) { src->port = port; @@ -360,102 +360,41 @@ gst_udpsrc_init_receive (GstUDPSrc * src) return TRUE; -error: + /* ERRORS */ +no_socket: { - perror ("open"); + GST_DEBUG ("no_socket"); + return FALSE; + } +setsockopt_error: + { + GST_DEBUG ("setsockopt failed"); + return FALSE; + } +bind_error: + { + GST_DEBUG ("bind failed"); + return FALSE; + } +getsockname_error: + { + GST_DEBUG ("getsockname failed"); return FALSE; } } -static void -gst_udpsrc_close (GstUDPSrc * src) +static gboolean +gst_udpsrc_stop (GstBaseSrc * bsrc) { + GstUDPSrc *src; + + src = GST_UDPSRC (bsrc); + if (src->sock != -1) { close (src->sock); src->sock = -1; } -} - -static gboolean -gst_udpsrc_activate (GstPad * pad, GstActivateMode mode) -{ - gboolean result; - GstUDPSrc *udpsrc; - - udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); - - switch (mode) { - case GST_ACTIVATE_PUSH: - /* if we have a scheduler we can start the task */ - if (GST_ELEMENT_SCHEDULER (udpsrc)) { - GST_STREAM_LOCK (pad); - GST_RPAD_TASK (pad) = - gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (udpsrc), - (GstTaskFunction) gst_udpsrc_loop, pad); - - gst_task_start (GST_RPAD_TASK (pad)); - GST_STREAM_UNLOCK (pad); - result = TRUE; - } - break; - case GST_ACTIVATE_PULL: - result = FALSE; - break; - case GST_ACTIVATE_NONE: - /* step 1, unblock clock sync (if any) */ - - /* step 2, make sure streaming finishes */ - GST_STREAM_LOCK (pad); - gst_udpsrc_close (udpsrc); - - /* step 3, stop the task */ - if (GST_RPAD_TASK (pad)) { - gst_task_stop (GST_RPAD_TASK (pad)); - gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); - GST_RPAD_TASK (pad) = NULL; - } - GST_STREAM_UNLOCK (pad); - - result = TRUE; - break; - } - return result; -} - -static GstElementStateReturn -gst_udpsrc_change_state (GstElement * element) -{ - GstElementStateReturn ret; - GstUDPSrc *src; - gint transition; - - src = GST_UDPSRC (element); - - transition = GST_STATE_TRANSITION (element); - - switch (transition) { - case GST_STATE_READY_TO_PAUSED: - if (!gst_udpsrc_init_receive (src)) - goto no_init; - break; - default: - break; - } - - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - - switch (transition) { - default: - break; - } - - return ret; - -no_init: - { - GST_DEBUG ("could not init udp socket"); - return GST_STATE_FAILURE; - } + return TRUE; } /*** GSTURIHANDLER INTERFACE *************************************************/ diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h index a347eb2ab5..c39cbe71a3 100644 --- a/gst/udp/gstudpsrc.h +++ b/gst/udp/gstudpsrc.h @@ -22,10 +22,9 @@ #define __GST_UDPSRC_H__ #include +#include -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #include #include @@ -53,14 +52,9 @@ typedef struct _GstUDPSrc GstUDPSrc; typedef struct _GstUDPSrcClass GstUDPSrcClass; struct _GstUDPSrc { - GstElement element; - - /* pads */ - GstPad *sinkpad, - *srcpad; + GstPushSrc parent; gchar *uri; - int port; int sock; gchar *multi_group; @@ -74,15 +68,12 @@ struct _GstUDPSrc { }; struct _GstUDPSrcClass { - GstElementClass parent_class; + GstPushSrcClass parent_class; }; GType gst_udpsrc_get_type(void); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ +G_END_DECLS #endif /* __GST_UDPSRC_H__ */