diff --git a/ChangeLog b/ChangeLog index fcbb4279ea..c48b280e89 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2005-01-13 Thomas Vander Stichele + + * ext/ogg/gstoggmux.c: + eos/bos debugging + * gst/tcp/gstmultifdsink.c: + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcp.c: + * gst/tcp/gsttcp.h: + * gst/tcp/gsttcpclientsink.c: + * gst/tcp/gsttcpclientsrc.c: + * gst/tcp/gsttcpserversink.c: + * gst/tcp/gsttcpserversrc.c: + improve reusability of elements after state changes and errors + make multifdsink throw away streamheaders when receiving new ones + 2005-01-13 Ronald S. Bultje * ext/alsa/gstalsa.c: (gst_alsa_rates_probe): diff --git a/ext/ogg/gstoggmux.c b/ext/ogg/gstoggmux.c index 164970b66a..9c13e5ecb3 100644 --- a/ext/ogg/gstoggmux.c +++ b/ext/ogg/gstoggmux.c @@ -1013,6 +1013,11 @@ gst_ogg_mux_loop (GstElement * element) pad->prev_delta = delta_unit; /* swap the packet in */ + if (packet.e_o_s == 1) + GST_DEBUG_OBJECT (pad, "swapping in EOS packet"); + if (packet.b_o_s == 1) + GST_DEBUG_OBJECT (pad, "swapping in BOS packet"); + ogg_stream_packetin (&pad->stream, &packet); /* don't need the old buffer anymore */ diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 3a3897b0f9..46bb677ccd 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -85,12 +85,12 @@ enum LAST_SIGNAL }; -/* this is really arbitrary choosen */ +/* this is really arbitrarily chosen */ #define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE #define DEFAULT_MODE GST_FDSET_MODE_POLL #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 -#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS +#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS #define DEFAULT_UNITS_MAX -1 #define DEFAULT_UNITS_SOFT_MAX -1 #define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE @@ -948,6 +948,8 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, /* if we have streamheader buffers, and haven't sent them to this client * yet, send them out one by one */ if (!client->streamheader_sent) { + GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd, + g_slist_length (sink->streamheader)); if (sink->streamheader) { GSList *l; @@ -1458,6 +1460,18 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) return; } + GST_LOG_OBJECT (sink, "received buffer %p", buf); + /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS, + * it means we're getting new streamheader buffers, and we should clear + * the old ones */ + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) && + sink->previous_buffer_in_caps == FALSE) { + GST_DEBUG_OBJECT (sink, + "receiving new IN_CAPS buffers, clearing old streamheader"); + g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL); + g_slist_free (sink->streamheader); + sink->streamheader = NULL; + } /* if the incoming buffer is marked as IN CAPS, then we assume for now * it's a streamheader that needs to be sent to each new client, so we * put it on our internal list of streamheader buffers. @@ -1465,6 +1479,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) * non IN_CAPS buffers so we properly keep track of clients that got * streamheaders. */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) { + sink->previous_buffer_in_caps = TRUE; GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %d to streamheader", GST_BUFFER_SIZE (buf)); @@ -1472,6 +1487,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) return; } + sink->previous_buffer_in_caps = FALSE; /* queue the buffer */ gst_multifdsink_queue_buffer (sink, buf); diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index aef304816f..6b8fc0b17f 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -115,7 +115,7 @@ typedef struct { guint64 last_activity_time; guint64 dropped_buffers; guint64 avg_queue_size; - + } GstTCPClient; struct _GstMultiFdSink { @@ -130,13 +130,15 @@ struct _GstMultiFdSink { GMutex *clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ GHashTable *fd_hash; /* index on fd to client */ - + GstFDSetMode mode; GstFDSet *fdset; GstFD control_sock[2];/* sockets for controlling the select call */ GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ + gboolean previous_buffer_in_caps; + GstTCPProtocolType protocol; guint mtu; diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c index 9db96de7c0..4e13956439 100644 --- a/gst/tcp/gsttcp.c +++ b/gst/tcp/gsttcp.c @@ -45,7 +45,8 @@ GST_DEBUG_CATEGORY_EXTERN (tcp_debug); /* resolve host to IP address, throwing errors if it fails */ /* host can already be an IP address */ -/* returns a newly allocated gchar * with the dotted ip address */ +/* returns a newly allocated gchar * with the dotted ip address, + or NULL, in which case it already fired an error. */ gchar * gst_tcp_host_to_ip (GstElement * element, const gchar * host) { @@ -140,9 +141,20 @@ gst_tcp_socket_read (int socket, void *buf, size_t count) return bytes_read; } +/* close the socket and reset the fd. Used to clean up after errors. */ +void +gst_tcp_socket_close (int *socket) +{ + close (*socket); + *socket = -1; +} + /* read the gdp buffer header from the given socket - * returns a GstData, - * representing the new GstBuffer to read data into, or an EOS event + * returns: + * - a GstData representing a GstBuffer in which data should be read + * - a GstData representing a GstEvent + * - NULL, indicating a connection close or an error, to be handled with + * EOS */ GstData * gst_tcp_gdp_read_header (GstElement * this, int socket) @@ -160,10 +172,9 @@ gst_tcp_gdp_read_header (GstElement * this, int socket) ret = gst_tcp_socket_read (socket, header, readsize); /* if we read 0 bytes, and we're blocking, we hit eos */ if (ret == 0) { - GST_DEBUG ("blocking read returns 0, EOS"); + GST_DEBUG ("blocking read returns 0, returning NULL"); g_free (header); - gst_element_set_eos (GST_ELEMENT (this)); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return NULL; } if (ret < 0) { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h index 84b7d32e6e..cce6ad21ff 100644 --- a/gst/tcp/gsttcp.h +++ b/gst/tcp/gsttcp.h @@ -44,6 +44,8 @@ gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host); gint gst_tcp_socket_write (int socket, const void *buf, size_t count); gint gst_tcp_socket_read (int socket, void *buf, size_t count); +void gst_tcp_socket_close (int *socket); + GstData * gst_tcp_gdp_read_header (GstElement *this, int socket); GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket); diff --git a/gst/tcp/gsttcpclientsink.c b/gst/tcp/gsttcpclientsink.c index ac0b76c3b7..8cccb5a82d 100644 --- a/gst/tcp/gsttcpclientsink.c +++ b/gst/tcp/gsttcpclientsink.c @@ -251,6 +251,10 @@ gst_tcpclientsink_set_property (GObject * object, guint prop_id, switch (prop_id) { case ARG_HOST: + if (!g_value_get_string (value)) { + g_warning ("host property cannot be NULL"); + break; + } g_free (tcpclientsink->host); tcpclientsink->host = g_strdup (g_value_get_string (value)); break; @@ -317,8 +321,10 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this) /* look up name if we need to */ ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); - if (!ip) + if (!ip) { + gst_tcp_socket_close (&this->sock_fd); return FALSE; + } GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip); /* connect to server */ @@ -333,6 +339,7 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this) sizeof (this->server_sin)); if (ret) { + gst_tcp_socket_close (&this->sock_fd); switch (errno) { case ECONNREFUSED: GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c index 446c437622..987eb52935 100644 --- a/gst/tcp/gsttcpclientsrc.c +++ b/gst/tcp/gsttcpclientsrc.c @@ -201,6 +201,36 @@ gst_tcpclientsrc_getcaps (GstPad * pad) return caps; } +/* close the socket and associated resources + * unset OPEN flag + * used both to recover from errors and go to NULL state */ +static void +gst_tcpclientsrc_close (GstTCPClientSrc * this) +{ + GST_DEBUG_OBJECT (this, "closing socket"); + if (this->sock_fd != -1) { + close (this->sock_fd); + this->sock_fd = -1; + } + this->caps_received = FALSE; + if (this->caps) { + gst_caps_free (this->caps); + this->caps = NULL; + } + GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN); +} + +/* close socket and related items and return an EOS GstData + * called from _get */ +static GstData * +gst_tcpclientsrc_eos (GstTCPClientSrc * src) +{ + GST_DEBUG_OBJECT (src, "going to EOS"); + gst_element_set_eos (GST_ELEMENT (src)); + gst_tcpclientsrc_close (src); + return GST_DATA (gst_event_new (GST_EVENT_EOS)); +} + static GstData * gst_tcpclientsrc_get (GstPad * pad) { @@ -214,14 +244,18 @@ gst_tcpclientsrc_get (GstPad * pad) g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad)); - g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN), NULL); + if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) { + GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data"); + return NULL; + } + GST_LOG_OBJECT (src, "asked for a buffer"); /* try to negotiate here */ if (!gst_pad_is_negotiated (pad)) { if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) { GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM); gst_buffer_unref (buf); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return gst_tcpclientsrc_eos (src); } } @@ -252,7 +286,7 @@ gst_tcpclientsrc_get (GstPad * pad) if (ret <= 0) { GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("select failed: %s", g_strerror (errno))); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return gst_tcpclientsrc_eos (src); } /* ask how much is available for reading on the socket */ @@ -260,19 +294,24 @@ gst_tcpclientsrc_get (GstPad * pad) if (ret < 0) { GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("ioctl failed: %s", g_strerror (errno))); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return gst_tcpclientsrc_eos (src); } GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize); buf = gst_buffer_new_and_alloc (readsize); break; case GST_TCP_PROTOCOL_TYPE_GDP: if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not read data header through GDP")); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return gst_tcpclientsrc_eos (src); } - if (GST_IS_EVENT (data)) + if (GST_IS_EVENT (data)) { + /* if we got back an EOS event, then we should go into eos ourselves */ + if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) { + gst_event_unref (data); + return gst_tcpclientsrc_eos (src); + } return data; + } + buf = GST_BUFFER (data); GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p", @@ -285,20 +324,19 @@ gst_tcpclientsrc_get (GstPad * pad) break; } - GST_LOG_OBJECT (src, "Reading %d bytes", readsize); + GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize); ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize); if (ret < 0) { GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); gst_buffer_unref (buf); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return gst_tcpclientsrc_eos (src); } /* if we read 0 bytes, and we're blocking, we hit eos */ if (ret == 0) { - GST_DEBUG ("blocking read returns 0, EOS"); + GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS"); gst_buffer_unref (buf); - gst_element_set_eos (GST_ELEMENT (src)); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); + return gst_tcpclientsrc_eos (src); } readsize = ret; @@ -360,6 +398,10 @@ gst_tcpclientsrc_set_property (GObject * object, guint prop_id, switch (prop_id) { case ARG_HOST: + if (!g_value_get_string (value)) { + g_warning ("host property cannot be NULL"); + break; + } g_free (tcpclientsrc->host); tcpclientsrc->host = g_strdup (g_value_get_string (value)); break; @@ -418,11 +460,14 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this) } GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d", this->sock_fd); + GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN); /* look up name if we need to */ ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); - if (!ip) + if (!ip) { + gst_tcpclientsrc_close (this); return FALSE; + } GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip); /* connect to server */ @@ -437,6 +482,7 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this) sizeof (this->server_sin)); if (ret) { + gst_tcpclientsrc_close (this); switch (errno) { case ECONNREFUSED: GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, @@ -455,7 +501,6 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this) this->send_discont = TRUE; this->buffer_after_discont = NULL; - GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN); /* get the caps if we're using GDP */ if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { @@ -465,11 +510,13 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this) GST_DEBUG_OBJECT (this, "getting caps through GDP"); if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) { + gst_tcpclientsrc_close (this); GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("Could not read caps through GDP")); return FALSE; } if (!GST_IS_CAPS (caps)) { + gst_tcpclientsrc_close (this); GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("Could not read caps through GDP")); return FALSE; @@ -483,34 +530,21 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this) return TRUE; } -static void -gst_tcpclientsrc_close (GstTCPClientSrc * this) -{ - if (this->sock_fd != -1) { - close (this->sock_fd); - this->sock_fd = -1; - } - this->caps_received = FALSE; - if (this->caps) { - gst_caps_free (this->caps); - this->caps = NULL; - } - GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN); -} - static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement * element) { g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE); - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN)) - gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN)) { - if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element))) - return GST_STATE_FAILURE; - } + /* if open and going to NULL, close it */ + if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) && + GST_STATE_PENDING (element) == GST_STATE_NULL) { + gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element)); + } + /* if closed and going to a state higher than NULL, open it */ + if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) && + GST_STATE_PENDING (element) > GST_STATE_NULL) { + if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element))) + return GST_STATE_FAILURE; } if (GST_ELEMENT_CLASS (parent_class)->change_state) diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index e6a038c063..453276502a 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -218,6 +218,10 @@ gst_tcpserversink_set_property (GObject * object, guint prop_id, switch (prop_id) { case ARG_HOST: + if (!g_value_get_string (value)) { + g_warning ("host property cannot be NULL"); + break; + } g_free (sink->host); sink->host = g_strdup (g_value_get_string (value)); break; @@ -273,6 +277,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) /* make address reusable */ if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof (int)) < 0) { + gst_tcp_socket_close (&this->server_sock.fd); GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); return FALSE; @@ -280,6 +285,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) /* keep connection alive; avoids SIGPIPE during write */ if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, &ret, sizeof (int)) < 0) { + gst_tcp_socket_close (&this->server_sock.fd); GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); return FALSE; @@ -297,6 +303,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) sizeof (this->server_sin)); if (ret) { + gst_tcp_socket_close (&this->server_sock.fd); switch (errno) { default: GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), @@ -313,6 +320,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent) GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", this->server_sock.fd, TCP_BACKLOG); if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) { + gst_tcp_socket_close (&this->server_sock.fd); GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), ("Could not listen on server socket: %s", g_strerror (errno))); return FALSE; diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c index 5735558203..f4c0b10f4d 100644 --- a/gst/tcp/gsttcpserversrc.c +++ b/gst/tcp/gsttcpserversrc.c @@ -409,6 +409,10 @@ gst_tcpserversrc_set_property (GObject * object, guint prop_id, switch (prop_id) { case ARG_HOST: + if (!g_value_get_string (value)) { + g_warning ("host property cannot be NULL"); + break; + } g_free (tcpserversrc->host); tcpserversrc->host = g_strdup (g_value_get_string (value)); break; @@ -483,8 +487,10 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this) if (this->host) { gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); - if (!host) + if (!host) { + gst_tcp_socket_close (&this->server_sock_fd); return FALSE; + } this->server_sin.sin_addr.s_addr = inet_addr (host); g_free (host); @@ -497,6 +503,7 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this) sizeof (this->server_sin)); if (ret) { + gst_tcp_socket_close (&this->server_sock_fd); switch (errno) { default: GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), @@ -509,6 +516,7 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this) GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", this->server_sock_fd, TCP_BACKLOG); if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) { + gst_tcp_socket_close (&this->server_sock_fd); GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), ("Could not listen on server socket: %s", g_strerror (errno))); return FALSE; @@ -521,6 +529,7 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this) accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin, &this->client_sin_len); if (this->client_sock_fd == -1) { + gst_tcp_socket_close (&this->server_sock_fd); GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), ("Could not accept client on server socket: %s", g_strerror (errno))); return FALSE;