mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 04:01:08 +00:00
ext/ogg/gstoggmux.c: eos/bos debugging
Original commit message from CVS: * ext/ogg/gstoggmux.c: eos/bos debugging * gst/tcp/gstmultifdsink.c: * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcp.c: * gst/tcp/gsttcp.h: * gst/tcp/gsttcpclientsink.c: * gst/tcp/gsttcpclientsrc.c: * gst/tcp/gsttcpserversink.c: * gst/tcp/gsttcpserversrc.c: improve reusability of elements after state changes and errors make multifdsink throw away streamheaders when receiving new ones
This commit is contained in:
parent
f612df4dfc
commit
66962aae37
10 changed files with 159 additions and 50 deletions
15
ChangeLog
15
ChangeLog
|
@ -1,3 +1,18 @@
|
||||||
|
2005-01-13 Thomas Vander Stichele <thomas at apestaart dot org>
|
||||||
|
|
||||||
|
* ext/ogg/gstoggmux.c:
|
||||||
|
eos/bos debugging
|
||||||
|
* gst/tcp/gstmultifdsink.c:
|
||||||
|
* gst/tcp/gstmultifdsink.h:
|
||||||
|
* gst/tcp/gsttcp.c:
|
||||||
|
* gst/tcp/gsttcp.h:
|
||||||
|
* gst/tcp/gsttcpclientsink.c:
|
||||||
|
* gst/tcp/gsttcpclientsrc.c:
|
||||||
|
* gst/tcp/gsttcpserversink.c:
|
||||||
|
* gst/tcp/gsttcpserversrc.c:
|
||||||
|
improve reusability of elements after state changes and errors
|
||||||
|
make multifdsink throw away streamheaders when receiving new ones
|
||||||
|
|
||||||
2005-01-13 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
|
2005-01-13 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
|
||||||
|
|
||||||
* ext/alsa/gstalsa.c: (gst_alsa_rates_probe):
|
* ext/alsa/gstalsa.c: (gst_alsa_rates_probe):
|
||||||
|
|
|
@ -1013,6 +1013,11 @@ gst_ogg_mux_loop (GstElement * element)
|
||||||
pad->prev_delta = delta_unit;
|
pad->prev_delta = delta_unit;
|
||||||
|
|
||||||
/* swap the packet in */
|
/* swap the packet in */
|
||||||
|
if (packet.e_o_s == 1)
|
||||||
|
GST_DEBUG_OBJECT (pad, "swapping in EOS packet");
|
||||||
|
if (packet.b_o_s == 1)
|
||||||
|
GST_DEBUG_OBJECT (pad, "swapping in BOS packet");
|
||||||
|
|
||||||
ogg_stream_packetin (&pad->stream, &packet);
|
ogg_stream_packetin (&pad->stream, &packet);
|
||||||
|
|
||||||
/* don't need the old buffer anymore */
|
/* don't need the old buffer anymore */
|
||||||
|
|
|
@ -85,12 +85,12 @@ enum
|
||||||
LAST_SIGNAL
|
LAST_SIGNAL
|
||||||
};
|
};
|
||||||
|
|
||||||
/* this is really arbitrary choosen */
|
/* this is really arbitrarily chosen */
|
||||||
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
|
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE
|
||||||
#define DEFAULT_MODE GST_FDSET_MODE_POLL
|
#define DEFAULT_MODE GST_FDSET_MODE_POLL
|
||||||
#define DEFAULT_BUFFERS_MAX -1
|
#define DEFAULT_BUFFERS_MAX -1
|
||||||
#define DEFAULT_BUFFERS_SOFT_MAX -1
|
#define DEFAULT_BUFFERS_SOFT_MAX -1
|
||||||
#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
|
#define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS
|
||||||
#define DEFAULT_UNITS_MAX -1
|
#define DEFAULT_UNITS_MAX -1
|
||||||
#define DEFAULT_UNITS_SOFT_MAX -1
|
#define DEFAULT_UNITS_SOFT_MAX -1
|
||||||
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
|
#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE
|
||||||
|
@ -948,6 +948,8 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
|
||||||
/* if we have streamheader buffers, and haven't sent them to this client
|
/* if we have streamheader buffers, and haven't sent them to this client
|
||||||
* yet, send them out one by one */
|
* yet, send them out one by one */
|
||||||
if (!client->streamheader_sent) {
|
if (!client->streamheader_sent) {
|
||||||
|
GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
|
||||||
|
g_slist_length (sink->streamheader));
|
||||||
if (sink->streamheader) {
|
if (sink->streamheader) {
|
||||||
GSList *l;
|
GSList *l;
|
||||||
|
|
||||||
|
@ -1458,6 +1460,18 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
GST_LOG_OBJECT (sink, "received buffer %p", buf);
|
||||||
|
/* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
|
||||||
|
* it means we're getting new streamheader buffers, and we should clear
|
||||||
|
* the old ones */
|
||||||
|
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) &&
|
||||||
|
sink->previous_buffer_in_caps == FALSE) {
|
||||||
|
GST_DEBUG_OBJECT (sink,
|
||||||
|
"receiving new IN_CAPS buffers, clearing old streamheader");
|
||||||
|
g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL);
|
||||||
|
g_slist_free (sink->streamheader);
|
||||||
|
sink->streamheader = NULL;
|
||||||
|
}
|
||||||
/* if the incoming buffer is marked as IN CAPS, then we assume for now
|
/* if the incoming buffer is marked as IN CAPS, then we assume for now
|
||||||
* it's a streamheader that needs to be sent to each new client, so we
|
* it's a streamheader that needs to be sent to each new client, so we
|
||||||
* put it on our internal list of streamheader buffers.
|
* put it on our internal list of streamheader buffers.
|
||||||
|
@ -1465,6 +1479,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
|
||||||
* non IN_CAPS buffers so we properly keep track of clients that got
|
* non IN_CAPS buffers so we properly keep track of clients that got
|
||||||
* streamheaders. */
|
* streamheaders. */
|
||||||
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
|
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
|
||||||
|
sink->previous_buffer_in_caps = TRUE;
|
||||||
GST_DEBUG_OBJECT (sink,
|
GST_DEBUG_OBJECT (sink,
|
||||||
"appending IN_CAPS buffer with length %d to streamheader",
|
"appending IN_CAPS buffer with length %d to streamheader",
|
||||||
GST_BUFFER_SIZE (buf));
|
GST_BUFFER_SIZE (buf));
|
||||||
|
@ -1472,6 +1487,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sink->previous_buffer_in_caps = FALSE;
|
||||||
/* queue the buffer */
|
/* queue the buffer */
|
||||||
gst_multifdsink_queue_buffer (sink, buf);
|
gst_multifdsink_queue_buffer (sink, buf);
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,7 @@ typedef struct {
|
||||||
guint64 last_activity_time;
|
guint64 last_activity_time;
|
||||||
guint64 dropped_buffers;
|
guint64 dropped_buffers;
|
||||||
guint64 avg_queue_size;
|
guint64 avg_queue_size;
|
||||||
|
|
||||||
} GstTCPClient;
|
} GstTCPClient;
|
||||||
|
|
||||||
struct _GstMultiFdSink {
|
struct _GstMultiFdSink {
|
||||||
|
@ -130,13 +130,15 @@ struct _GstMultiFdSink {
|
||||||
GMutex *clientslock; /* lock to protect the clients list */
|
GMutex *clientslock; /* lock to protect the clients list */
|
||||||
GList *clients; /* list of clients we are serving */
|
GList *clients; /* list of clients we are serving */
|
||||||
GHashTable *fd_hash; /* index on fd to client */
|
GHashTable *fd_hash; /* index on fd to client */
|
||||||
|
|
||||||
GstFDSetMode mode;
|
GstFDSetMode mode;
|
||||||
GstFDSet *fdset;
|
GstFDSet *fdset;
|
||||||
|
|
||||||
GstFD control_sock[2];/* sockets for controlling the select call */
|
GstFD control_sock[2];/* sockets for controlling the select call */
|
||||||
|
|
||||||
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
|
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
|
||||||
|
gboolean previous_buffer_in_caps;
|
||||||
|
|
||||||
GstTCPProtocolType protocol;
|
GstTCPProtocolType protocol;
|
||||||
guint mtu;
|
guint mtu;
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,8 @@ GST_DEBUG_CATEGORY_EXTERN (tcp_debug);
|
||||||
|
|
||||||
/* resolve host to IP address, throwing errors if it fails */
|
/* resolve host to IP address, throwing errors if it fails */
|
||||||
/* host can already be an IP address */
|
/* host can already be an IP address */
|
||||||
/* returns a newly allocated gchar * with the dotted ip address */
|
/* returns a newly allocated gchar * with the dotted ip address,
|
||||||
|
or NULL, in which case it already fired an error. */
|
||||||
gchar *
|
gchar *
|
||||||
gst_tcp_host_to_ip (GstElement * element, const gchar * host)
|
gst_tcp_host_to_ip (GstElement * element, const gchar * host)
|
||||||
{
|
{
|
||||||
|
@ -140,9 +141,20 @@ gst_tcp_socket_read (int socket, void *buf, size_t count)
|
||||||
return bytes_read;
|
return bytes_read;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* close the socket and reset the fd. Used to clean up after errors. */
|
||||||
|
void
|
||||||
|
gst_tcp_socket_close (int *socket)
|
||||||
|
{
|
||||||
|
close (*socket);
|
||||||
|
*socket = -1;
|
||||||
|
}
|
||||||
|
|
||||||
/* read the gdp buffer header from the given socket
|
/* read the gdp buffer header from the given socket
|
||||||
* returns a GstData,
|
* returns:
|
||||||
* representing the new GstBuffer to read data into, or an EOS event
|
* - a GstData representing a GstBuffer in which data should be read
|
||||||
|
* - a GstData representing a GstEvent
|
||||||
|
* - NULL, indicating a connection close or an error, to be handled with
|
||||||
|
* EOS
|
||||||
*/
|
*/
|
||||||
GstData *
|
GstData *
|
||||||
gst_tcp_gdp_read_header (GstElement * this, int socket)
|
gst_tcp_gdp_read_header (GstElement * this, int socket)
|
||||||
|
@ -160,10 +172,9 @@ gst_tcp_gdp_read_header (GstElement * this, int socket)
|
||||||
ret = gst_tcp_socket_read (socket, header, readsize);
|
ret = gst_tcp_socket_read (socket, header, readsize);
|
||||||
/* if we read 0 bytes, and we're blocking, we hit eos */
|
/* if we read 0 bytes, and we're blocking, we hit eos */
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
GST_DEBUG ("blocking read returns 0, EOS");
|
GST_DEBUG ("blocking read returns 0, returning NULL");
|
||||||
g_free (header);
|
g_free (header);
|
||||||
gst_element_set_eos (GST_ELEMENT (this));
|
return NULL;
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
|
||||||
}
|
}
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
|
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
|
||||||
|
|
|
@ -44,6 +44,8 @@ gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
|
||||||
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
|
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
|
||||||
gint gst_tcp_socket_read (int socket, void *buf, size_t count);
|
gint gst_tcp_socket_read (int socket, void *buf, size_t count);
|
||||||
|
|
||||||
|
void gst_tcp_socket_close (int *socket);
|
||||||
|
|
||||||
GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
|
GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
|
||||||
GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
|
GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
|
||||||
|
|
||||||
|
|
|
@ -251,6 +251,10 @@ gst_tcpclientsink_set_property (GObject * object, guint prop_id,
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_HOST:
|
case ARG_HOST:
|
||||||
|
if (!g_value_get_string (value)) {
|
||||||
|
g_warning ("host property cannot be NULL");
|
||||||
|
break;
|
||||||
|
}
|
||||||
g_free (tcpclientsink->host);
|
g_free (tcpclientsink->host);
|
||||||
tcpclientsink->host = g_strdup (g_value_get_string (value));
|
tcpclientsink->host = g_strdup (g_value_get_string (value));
|
||||||
break;
|
break;
|
||||||
|
@ -317,8 +321,10 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this)
|
||||||
|
|
||||||
/* look up name if we need to */
|
/* look up name if we need to */
|
||||||
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
|
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
|
||||||
if (!ip)
|
if (!ip) {
|
||||||
|
gst_tcp_socket_close (&this->sock_fd);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
}
|
||||||
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
|
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
|
||||||
|
|
||||||
/* connect to server */
|
/* connect to server */
|
||||||
|
@ -333,6 +339,7 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this)
|
||||||
sizeof (this->server_sin));
|
sizeof (this->server_sin));
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
gst_tcp_socket_close (&this->sock_fd);
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case ECONNREFUSED:
|
case ECONNREFUSED:
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE,
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE,
|
||||||
|
|
|
@ -201,6 +201,36 @@ gst_tcpclientsrc_getcaps (GstPad * pad)
|
||||||
return caps;
|
return caps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* close the socket and associated resources
|
||||||
|
* unset OPEN flag
|
||||||
|
* used both to recover from errors and go to NULL state */
|
||||||
|
static void
|
||||||
|
gst_tcpclientsrc_close (GstTCPClientSrc * this)
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (this, "closing socket");
|
||||||
|
if (this->sock_fd != -1) {
|
||||||
|
close (this->sock_fd);
|
||||||
|
this->sock_fd = -1;
|
||||||
|
}
|
||||||
|
this->caps_received = FALSE;
|
||||||
|
if (this->caps) {
|
||||||
|
gst_caps_free (this->caps);
|
||||||
|
this->caps = NULL;
|
||||||
|
}
|
||||||
|
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* close socket and related items and return an EOS GstData
|
||||||
|
* called from _get */
|
||||||
|
static GstData *
|
||||||
|
gst_tcpclientsrc_eos (GstTCPClientSrc * src)
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (src, "going to EOS");
|
||||||
|
gst_element_set_eos (GST_ELEMENT (src));
|
||||||
|
gst_tcpclientsrc_close (src);
|
||||||
|
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
||||||
|
}
|
||||||
|
|
||||||
static GstData *
|
static GstData *
|
||||||
gst_tcpclientsrc_get (GstPad * pad)
|
gst_tcpclientsrc_get (GstPad * pad)
|
||||||
{
|
{
|
||||||
|
@ -214,14 +244,18 @@ gst_tcpclientsrc_get (GstPad * pad)
|
||||||
g_return_val_if_fail (pad != NULL, NULL);
|
g_return_val_if_fail (pad != NULL, NULL);
|
||||||
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
|
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
|
||||||
src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
|
src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
|
||||||
g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN), NULL);
|
if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) {
|
||||||
|
GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
GST_LOG_OBJECT (src, "asked for a buffer");
|
||||||
|
|
||||||
/* try to negotiate here */
|
/* try to negotiate here */
|
||||||
if (!gst_pad_is_negotiated (pad)) {
|
if (!gst_pad_is_negotiated (pad)) {
|
||||||
if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) {
|
if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) {
|
||||||
GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM);
|
GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM);
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
return gst_tcpclientsrc_eos (src);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,7 +286,7 @@ gst_tcpclientsrc_get (GstPad * pad)
|
||||||
if (ret <= 0) {
|
if (ret <= 0) {
|
||||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||||
("select failed: %s", g_strerror (errno)));
|
("select failed: %s", g_strerror (errno)));
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
return gst_tcpclientsrc_eos (src);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ask how much is available for reading on the socket */
|
/* ask how much is available for reading on the socket */
|
||||||
|
@ -260,19 +294,24 @@ gst_tcpclientsrc_get (GstPad * pad)
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
||||||
("ioctl failed: %s", g_strerror (errno)));
|
("ioctl failed: %s", g_strerror (errno)));
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
return gst_tcpclientsrc_eos (src);
|
||||||
}
|
}
|
||||||
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
|
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
|
||||||
buf = gst_buffer_new_and_alloc (readsize);
|
buf = gst_buffer_new_and_alloc (readsize);
|
||||||
break;
|
break;
|
||||||
case GST_TCP_PROTOCOL_TYPE_GDP:
|
case GST_TCP_PROTOCOL_TYPE_GDP:
|
||||||
if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
|
if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
|
||||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
|
return gst_tcpclientsrc_eos (src);
|
||||||
("Could not read data header through GDP"));
|
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
|
||||||
}
|
}
|
||||||
if (GST_IS_EVENT (data))
|
if (GST_IS_EVENT (data)) {
|
||||||
|
/* if we got back an EOS event, then we should go into eos ourselves */
|
||||||
|
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
|
||||||
|
gst_event_unref (data);
|
||||||
|
return gst_tcpclientsrc_eos (src);
|
||||||
|
}
|
||||||
return data;
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
buf = GST_BUFFER (data);
|
buf = GST_BUFFER (data);
|
||||||
|
|
||||||
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
|
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
|
||||||
|
@ -285,20 +324,19 @@ gst_tcpclientsrc_get (GstPad * pad)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
|
GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
|
||||||
ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
|
ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
|
GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
return gst_tcpclientsrc_eos (src);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if we read 0 bytes, and we're blocking, we hit eos */
|
/* if we read 0 bytes, and we're blocking, we hit eos */
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
GST_DEBUG ("blocking read returns 0, EOS");
|
GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
gst_element_set_eos (GST_ELEMENT (src));
|
return gst_tcpclientsrc_eos (src);
|
||||||
return GST_DATA (gst_event_new (GST_EVENT_EOS));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
readsize = ret;
|
readsize = ret;
|
||||||
|
@ -360,6 +398,10 @@ gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_HOST:
|
case ARG_HOST:
|
||||||
|
if (!g_value_get_string (value)) {
|
||||||
|
g_warning ("host property cannot be NULL");
|
||||||
|
break;
|
||||||
|
}
|
||||||
g_free (tcpclientsrc->host);
|
g_free (tcpclientsrc->host);
|
||||||
tcpclientsrc->host = g_strdup (g_value_get_string (value));
|
tcpclientsrc->host = g_strdup (g_value_get_string (value));
|
||||||
break;
|
break;
|
||||||
|
@ -418,11 +460,14 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
|
||||||
}
|
}
|
||||||
GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
|
GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
|
||||||
this->sock_fd);
|
this->sock_fd);
|
||||||
|
GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
|
||||||
|
|
||||||
/* look up name if we need to */
|
/* look up name if we need to */
|
||||||
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
|
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
|
||||||
if (!ip)
|
if (!ip) {
|
||||||
|
gst_tcpclientsrc_close (this);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
}
|
||||||
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
|
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
|
||||||
|
|
||||||
/* connect to server */
|
/* connect to server */
|
||||||
|
@ -437,6 +482,7 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
|
||||||
sizeof (this->server_sin));
|
sizeof (this->server_sin));
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
gst_tcpclientsrc_close (this);
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
case ECONNREFUSED:
|
case ECONNREFUSED:
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
|
||||||
|
@ -455,7 +501,6 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
|
||||||
|
|
||||||
this->send_discont = TRUE;
|
this->send_discont = TRUE;
|
||||||
this->buffer_after_discont = NULL;
|
this->buffer_after_discont = NULL;
|
||||||
GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
|
|
||||||
|
|
||||||
/* get the caps if we're using GDP */
|
/* get the caps if we're using GDP */
|
||||||
if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
|
if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
|
||||||
|
@ -465,11 +510,13 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (this, "getting caps through GDP");
|
GST_DEBUG_OBJECT (this, "getting caps through GDP");
|
||||||
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) {
|
if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) {
|
||||||
|
gst_tcpclientsrc_close (this);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
|
||||||
("Could not read caps through GDP"));
|
("Could not read caps through GDP"));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
if (!GST_IS_CAPS (caps)) {
|
if (!GST_IS_CAPS (caps)) {
|
||||||
|
gst_tcpclientsrc_close (this);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
|
||||||
("Could not read caps through GDP"));
|
("Could not read caps through GDP"));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
@ -483,34 +530,21 @@ gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
gst_tcpclientsrc_close (GstTCPClientSrc * this)
|
|
||||||
{
|
|
||||||
if (this->sock_fd != -1) {
|
|
||||||
close (this->sock_fd);
|
|
||||||
this->sock_fd = -1;
|
|
||||||
}
|
|
||||||
this->caps_received = FALSE;
|
|
||||||
if (this->caps) {
|
|
||||||
gst_caps_free (this->caps);
|
|
||||||
this->caps = NULL;
|
|
||||||
}
|
|
||||||
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
|
|
||||||
}
|
|
||||||
|
|
||||||
static GstElementStateReturn
|
static GstElementStateReturn
|
||||||
gst_tcpclientsrc_change_state (GstElement * element)
|
gst_tcpclientsrc_change_state (GstElement * element)
|
||||||
{
|
{
|
||||||
g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
|
g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
|
||||||
|
|
||||||
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
|
/* if open and going to NULL, close it */
|
||||||
if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN))
|
if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
|
||||||
gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
|
GST_STATE_PENDING (element) == GST_STATE_NULL) {
|
||||||
} else {
|
gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
|
||||||
if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN)) {
|
}
|
||||||
if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
|
/* if closed and going to a state higher than NULL, open it */
|
||||||
return GST_STATE_FAILURE;
|
if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
|
||||||
}
|
GST_STATE_PENDING (element) > GST_STATE_NULL) {
|
||||||
|
if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
|
||||||
|
return GST_STATE_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (GST_ELEMENT_CLASS (parent_class)->change_state)
|
if (GST_ELEMENT_CLASS (parent_class)->change_state)
|
||||||
|
|
|
@ -218,6 +218,10 @@ gst_tcpserversink_set_property (GObject * object, guint prop_id,
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_HOST:
|
case ARG_HOST:
|
||||||
|
if (!g_value_get_string (value)) {
|
||||||
|
g_warning ("host property cannot be NULL");
|
||||||
|
break;
|
||||||
|
}
|
||||||
g_free (sink->host);
|
g_free (sink->host);
|
||||||
sink->host = g_strdup (g_value_get_string (value));
|
sink->host = g_strdup (g_value_get_string (value));
|
||||||
break;
|
break;
|
||||||
|
@ -273,6 +277,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent)
|
||||||
/* make address reusable */
|
/* make address reusable */
|
||||||
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
|
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
|
||||||
sizeof (int)) < 0) {
|
sizeof (int)) < 0) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock.fd);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
|
||||||
("Could not setsockopt: %s", g_strerror (errno)));
|
("Could not setsockopt: %s", g_strerror (errno)));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
@ -280,6 +285,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent)
|
||||||
/* keep connection alive; avoids SIGPIPE during write */
|
/* keep connection alive; avoids SIGPIPE during write */
|
||||||
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, &ret,
|
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, &ret,
|
||||||
sizeof (int)) < 0) {
|
sizeof (int)) < 0) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock.fd);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
|
||||||
("Could not setsockopt: %s", g_strerror (errno)));
|
("Could not setsockopt: %s", g_strerror (errno)));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
@ -297,6 +303,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent)
|
||||||
sizeof (this->server_sin));
|
sizeof (this->server_sin));
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock.fd);
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
default:
|
default:
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
||||||
|
@ -313,6 +320,7 @@ gst_tcpserversink_init_send (GstMultiFdSink * parent)
|
||||||
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
|
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
|
||||||
this->server_sock.fd, TCP_BACKLOG);
|
this->server_sock.fd, TCP_BACKLOG);
|
||||||
if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) {
|
if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock.fd);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
||||||
("Could not listen on server socket: %s", g_strerror (errno)));
|
("Could not listen on server socket: %s", g_strerror (errno)));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
|
|
@ -409,6 +409,10 @@ gst_tcpserversrc_set_property (GObject * object, guint prop_id,
|
||||||
|
|
||||||
switch (prop_id) {
|
switch (prop_id) {
|
||||||
case ARG_HOST:
|
case ARG_HOST:
|
||||||
|
if (!g_value_get_string (value)) {
|
||||||
|
g_warning ("host property cannot be NULL");
|
||||||
|
break;
|
||||||
|
}
|
||||||
g_free (tcpserversrc->host);
|
g_free (tcpserversrc->host);
|
||||||
tcpserversrc->host = g_strdup (g_value_get_string (value));
|
tcpserversrc->host = g_strdup (g_value_get_string (value));
|
||||||
break;
|
break;
|
||||||
|
@ -483,8 +487,10 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
|
||||||
if (this->host) {
|
if (this->host) {
|
||||||
gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
|
gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
|
||||||
|
|
||||||
if (!host)
|
if (!host) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock_fd);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
this->server_sin.sin_addr.s_addr = inet_addr (host);
|
this->server_sin.sin_addr.s_addr = inet_addr (host);
|
||||||
g_free (host);
|
g_free (host);
|
||||||
|
@ -497,6 +503,7 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
|
||||||
sizeof (this->server_sin));
|
sizeof (this->server_sin));
|
||||||
|
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock_fd);
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
default:
|
default:
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
||||||
|
@ -509,6 +516,7 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
|
||||||
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
|
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
|
||||||
this->server_sock_fd, TCP_BACKLOG);
|
this->server_sock_fd, TCP_BACKLOG);
|
||||||
if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
|
if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock_fd);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
||||||
("Could not listen on server socket: %s", g_strerror (errno)));
|
("Could not listen on server socket: %s", g_strerror (errno)));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
@ -521,6 +529,7 @@ gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
|
||||||
accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
|
accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
|
||||||
&this->client_sin_len);
|
&this->client_sin_len);
|
||||||
if (this->client_sock_fd == -1) {
|
if (this->client_sock_fd == -1) {
|
||||||
|
gst_tcp_socket_close (&this->server_sock_fd);
|
||||||
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
|
||||||
("Could not accept client on server socket: %s", g_strerror (errno)));
|
("Could not accept client on server socket: %s", g_strerror (errno)));
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
|
Loading…
Reference in a new issue