gst/tcp/: Removed fdset and stress test, they are now known as GstPoll in core.

Original commit message from CVS:
Patch by: Peter Kjellerstedt  <pkj at axis com>
* gst/tcp/Makefile.am:
* gst/tcp/fdsetstress.c:
* gst/tcp/gstfdset.c:
* gst/tcp/gstfdset.h:
Removed fdset and stress test, they are now known as GstPoll in
core.
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init),
(gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove),
(gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_queue_buffer),
(gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start),
(gst_multi_fd_sink_stop):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close),
(gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer),
(gst_tcp_gdp_read_caps):
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init),
(gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render),
(gst_tcp_client_sink_start), (gst_tcp_client_sink_stop):
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init),
(gst_tcp_client_src_create), (gst_tcp_client_src_start),
(gst_tcp_client_src_stop), (gst_tcp_client_src_unlock):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait),
(gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close):
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init),
(gst_tcp_server_src_create), (gst_tcp_server_src_start),
(gst_tcp_server_src_stop), (gst_tcp_server_src_unlock):
* gst/tcp/gsttcpserversrc.h:
Port to GstPoll. See #505417.
This commit is contained in:
Peter Kjellerstedt 2008-02-28 10:54:14 +00:00 committed by Wim Taymans
parent 45e039dd76
commit 405571a67e
17 changed files with 196 additions and 1096 deletions

View file

@ -1,3 +1,43 @@
2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
Patch by: Peter Kjellerstedt <pkj at axis com>
* gst/tcp/Makefile.am:
* gst/tcp/fdsetstress.c:
* gst/tcp/gstfdset.c:
* gst/tcp/gstfdset.h:
Removed fdset and stress test, they are now known as GstPoll in
core.
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init),
(gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove),
(gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_queue_buffer),
(gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start),
(gst_multi_fd_sink_stop):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close),
(gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer),
(gst_tcp_gdp_read_caps):
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init),
(gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render),
(gst_tcp_client_sink_start), (gst_tcp_client_sink_stop):
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init),
(gst_tcp_client_src_create), (gst_tcp_client_src_start),
(gst_tcp_client_src_stop), (gst_tcp_client_src_unlock):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait),
(gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close):
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init),
(gst_tcp_server_src_create), (gst_tcp_server_src_start),
(gst_tcp_server_src_stop), (gst_tcp_server_src_unlock):
* gst/tcp/gsttcpserversrc.h:
Port to GstPoll. See #505417.
2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
Patch by: Peter Kjellerstedt <pkj at axis com>

View file

@ -15,7 +15,6 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
libgsttcp_la_SOURCES = \
gsttcpplugin.c \
gsttcp.c \
gstfdset.c \
gstmultifdsink.c \
gsttcpclientsrc.c gsttcpclientsink.c \
gsttcpserversrc.c gsttcpserversink.c
@ -31,7 +30,6 @@ libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_GDP_LIBS) $(GST_LIBS)
noinst_HEADERS = \
gsttcpplugin.h \
gsttcp.h \
gstfdset.h \
gstmultifdsink.h \
gsttcpclientsrc.h gsttcpclientsink.h \
gsttcpserversrc.h gsttcpserversink.h
@ -39,9 +37,3 @@ noinst_HEADERS = \
CLEANFILES = $(BUILT_SOURCES)
EXTRA_DIST = gsttcp-marshal.list
noinst_PROGRAMS = fdsetstress
fdsetstress_SOURCES = fdsetstress.c gstfdset.c
fdsetstress_CFLAGS = $(GST_CFLAGS)
fdsetstress_LDFLAGS = $(GST_LIBS)

View file

@ -1,179 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <stdlib.h>
#include <gst/gst.h>
#include "gstfdset.h"
static GstFDSet *set;
static GList *fds = NULL;
static GMutex *fdlock;
static GTimer *timer;
#define MAX_THREADS 100
static void
mess_some_more ()
{
GList *walk;
gint random;
gint removed = 0;
g_mutex_lock (fdlock);
for (walk = fds; walk;) {
GstFD *fd = (GstFD *) walk->data;
walk = g_list_next (walk);
random = (gint) (10.0 * rand () / (RAND_MAX + 1.0));
switch (random) {
case 0:
{
/*
GstFD *newfd = g_new0 (GstFD, 1);
gst_fdset_add_fd (set, newfd);
fds = g_list_prepend (fds, newfd);
*/
break;
}
case 1:
if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) {
gst_fdset_remove_fd (set, fd);
fds = g_list_remove (fds, fd);
g_free (fd);
removed++;
}
break;
case 2:
gst_fdset_fd_ctl_write (set, fd, TRUE);
break;
case 3:
gst_fdset_fd_ctl_write (set, fd, FALSE);
break;
case 4:
gst_fdset_fd_ctl_read (set, fd, TRUE);
break;
case 5:
gst_fdset_fd_ctl_read (set, fd, FALSE);
break;
case 6:
gst_fdset_fd_has_closed (set, fd);
break;
case 7:
gst_fdset_fd_has_error (set, fd);
break;
case 8:
gst_fdset_fd_can_read (set, fd);
break;
case 9:
gst_fdset_fd_can_write (set, fd);
break;
default:
g_assert_not_reached ();
break;
}
}
if (g_list_length (fds) < 900) {
random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0));
while (random) {
GstFD *newfd = g_new0 (GstFD, 1);
gst_fdset_add_fd (set, newfd);
fds = g_list_prepend (fds, newfd);
random--;
}
}
g_mutex_unlock (fdlock);
}
void *
run_test (void *threadid)
{
gint id = GPOINTER_TO_INT (threadid);
while (TRUE) {
if (id == 0) {
gint res = gst_fdset_wait (set, 10);
if (res < 0) {
g_print ("error %d %s\n", errno, g_strerror (errno));
}
} else {
mess_some_more ();
if (g_timer_elapsed (timer, NULL) > 0.5) {
g_mutex_lock (fdlock);
g_print ("active fds :%d\n", g_list_length (fds));
g_timer_start (timer);
g_mutex_unlock (fdlock);
}
g_usleep (1);
}
}
g_thread_exit (NULL);
return NULL;
}
gint
main (gint argc, gchar * argv[])
{
GThread *threads[MAX_THREADS];
gint num_threads;
gint t;
gst_init (&argc, &argv);
fdlock = g_mutex_new ();
timer = g_timer_new ();
if (argc != 2) {
g_print ("usage: %s <num_threads>\n", argv[0]);
exit (-1);
}
num_threads = atoi (argv[1]);
set = gst_fdset_new (GST_FDSET_MODE_POLL);
for (t = 0; t < num_threads; t++) {
GError *error = NULL;
threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error);
if (error) {
printf ("ERROR: g_thread_create() %s\n", error->message);
exit (-1);
}
}
printf ("main(): Created %d threads.\n", t);
for (t = 0; t < num_threads; t++) {
g_thread_join (threads[t]);
}
gst_fdset_free (set);
return 0;
}

View file

@ -1,532 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
*
* gsttcpfdset.h: fdset datastructure
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#define MIN_POLLFDS 32
#define INIT_POLLFDS MIN_POLLFDS
#include <sys/poll.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
/* OS/X needs this because of bad headers */
#include <string.h>
#include "gstfdset.h"
GType
gst_fdset_mode_get_type (void)
{
static GType fdset_mode_type = 0;
static const GEnumValue fdset_mode[] = {
{GST_FDSET_MODE_SELECT, "Select", "select"},
{GST_FDSET_MODE_POLL, "Poll", "poll"},
{GST_FDSET_MODE_EPOLL, "EPoll", "epoll"},
{0, NULL, NULL},
};
if (!fdset_mode_type) {
fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
}
return fdset_mode_type;
}
struct _GstFDSet
{
GstFDSetMode mode;
/* for poll */
struct pollfd *testpollfds;
gint last_testpollfds;
gint testsize;
struct pollfd *pollfds;
gint size;
gint free;
gint last_pollfds;
GMutex *poll_lock;
/* for select */
fd_set readfds, writefds; /* input */
fd_set testreadfds, testwritefds; /* output */
};
static gint
nearest_pow (gint num)
{
/* hacker's delight page 48 */
num -= 1;
num |= num >> 1;
num |= num >> 2;
num |= num >> 4;
num |= num >> 8;
num |= num >> 16;
return num + 1;
}
/* resize a given pollfd array from old_size number of items
* to new_size number of items. Also initializes the new elements
* with the default values. */
static struct pollfd *
resize (struct pollfd *fds, gint old_size, gint new_size)
{
struct pollfd *res;
gint i;
res = g_realloc (fds, new_size * sizeof (struct pollfd));
for (i = old_size; i < new_size; i++) {
res[i].fd = -1;
res[i].events = 0;
res[i].revents = 0;
}
return res;
}
static void
ensure_size (GstFDSet * set, gint len)
{
if (len > set->size) {
len = nearest_pow (len);
len = MAX (len, MIN_POLLFDS);
set->pollfds = resize (set->pollfds, set->size, len);
set->size = len;
}
}
GstFDSet *
gst_fdset_new (GstFDSetMode mode)
{
GstFDSet *nset;
nset = g_new0 (GstFDSet, 1);
nset->mode = mode;
switch (mode) {
case GST_FDSET_MODE_SELECT:
FD_ZERO (&nset->readfds);
FD_ZERO (&nset->writefds);
break;
case GST_FDSET_MODE_POLL:
nset->pollfds = NULL;
nset->testpollfds = NULL;
nset->free = 0;
nset->last_pollfds = 0;
nset->poll_lock = g_mutex_new ();
ensure_size (nset, MIN_POLLFDS);
break;
case GST_FDSET_MODE_EPOLL:
g_warning ("implement EPOLL mode in GstFDSet");
break;
default:
break;
}
return nset;
}
void
gst_fdset_free (GstFDSet * set)
{
g_return_if_fail (set != NULL);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
break;
case GST_FDSET_MODE_POLL:
g_free (set->testpollfds);
g_free (set->pollfds);
g_mutex_free (set->poll_lock);
break;
case GST_FDSET_MODE_EPOLL:
g_warning ("implement EPOLL mode in GstFDSet");
break;
default:
break;
}
g_free (set);
}
void
gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode)
{
g_return_if_fail (set != NULL);
g_warning ("implement set_mode in GstFDSet");
}
GstFDSetMode
gst_fdset_get_mode (GstFDSet * set)
{
g_return_val_if_fail (set != NULL, FALSE);
return set->mode;
}
gboolean
gst_fdset_add_fd (GstFDSet * set, GstFD * fd)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = TRUE;
break;
case GST_FDSET_MODE_POLL:
{
struct pollfd *nfd;
gint idx;
g_mutex_lock (set->poll_lock);
ensure_size (set, set->last_pollfds + 1);
idx = set->free;
if (idx == -1) {
/* find free space */
while (idx < set->last_pollfds) {
idx++;
if (set->pollfds[idx].fd == -1)
break;
}
}
nfd = &set->pollfds[idx];
nfd->fd = fd->fd;
nfd->events = POLLERR | POLLNVAL | POLLHUP;
nfd->revents = 0;
/* see if we have one fd more */
set->last_pollfds = MAX (idx + 1, set->last_pollfds);
fd->idx = idx;
set->free = -1;
g_mutex_unlock (set->poll_lock);
res = TRUE;
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}
gboolean
gst_fdset_remove_fd (GstFDSet * set, GstFD * fd)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
/* nothing */
FD_CLR (fd->fd, &set->writefds);
FD_CLR (fd->fd, &set->readfds);
res = TRUE;
break;
case GST_FDSET_MODE_POLL:
{
g_mutex_lock (set->poll_lock);
/* FIXME on some platforms poll doesn't ignore the fd
* when set to -1 */
set->pollfds[fd->idx].fd = -1;
set->pollfds[fd->idx].events = 0;
set->pollfds[fd->idx].revents = 0;
/* if we removed the last fd, we can lower the last_pollfds */
if (fd->idx + 1 == set->last_pollfds) {
set->last_pollfds--;
}
fd->idx = -1;
if (set->free == -1) {
set->free = fd->idx;
} else {
set->free = MIN (set->free, fd->idx);
}
g_mutex_unlock (set->poll_lock);
res = TRUE;
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}
void
gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
{
g_return_if_fail (set != NULL);
g_return_if_fail (fd != NULL);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
if (active)
FD_SET (fd->fd, &set->writefds);
else
FD_CLR (fd->fd, &set->writefds);
break;
case GST_FDSET_MODE_POLL:
{
gint idx;
g_mutex_lock (set->poll_lock);
idx = fd->idx;
if (idx >= 0) {
gint events = set->pollfds[idx].events;
if (active)
events |= POLLOUT;
else
events &= ~POLLOUT;
set->pollfds[idx].events = events;
}
g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
}
void
gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active)
{
g_return_if_fail (set != NULL);
g_return_if_fail (fd != NULL);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
if (active)
FD_SET (fd->fd, &set->readfds);
else
FD_CLR (fd->fd, &set->readfds);
break;
case GST_FDSET_MODE_POLL:
{
gint idx;
g_mutex_lock (set->poll_lock);
idx = fd->idx;
if (idx >= 0) {
gint events = set->pollfds[idx].events;
if (active)
events |= (POLLIN | POLLPRI);
else
events &= ~(POLLIN | POLLPRI);
set->pollfds[idx].events = events;
}
g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
}
gboolean
gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FALSE;
break;
case GST_FDSET_MODE_POLL:
{
gint idx;
g_mutex_lock (set->poll_lock);
idx = fd->idx;
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & POLLHUP) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}
gboolean
gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FALSE;
break;
case GST_FDSET_MODE_POLL:
{
gint idx;
g_mutex_lock (set->poll_lock);
idx = fd->idx;
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}
gboolean
gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FD_ISSET (fd->fd, &set->testreadfds);
break;
case GST_FDSET_MODE_POLL:
{
gint idx;
g_mutex_lock (set->poll_lock);
idx = fd->idx;
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}
gboolean
gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
res = FD_ISSET (fd->fd, &set->testwritefds);
break;
case GST_FDSET_MODE_POLL:
{
gint idx;
g_mutex_lock (set->poll_lock);
idx = fd->idx;
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & POLLOUT) != 0;
}
g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}
gint
gst_fdset_wait (GstFDSet * set, int timeout)
{
int res = -1;
g_return_val_if_fail (set != NULL, -1);
switch (set->mode) {
case GST_FDSET_MODE_SELECT:
{
struct timeval tv;
struct timeval *tvptr;
set->testreadfds = set->readfds;
set->testwritefds = set->writefds;
if (timeout > 0) {
tv.tv_sec = timeout / 1000;
tv.tv_usec = timeout % 1000;
tvptr = &tv;
} else {
tvptr = NULL;
}
res =
select (FD_SETSIZE, &set->testreadfds, &set->testwritefds,
(fd_set *) 0, tvptr);
break;
}
case GST_FDSET_MODE_POLL:
{
g_mutex_lock (set->poll_lock);
if (set->testsize != set->size) {
set->testpollfds = resize (set->testpollfds, set->testsize, set->size);
set->testsize = set->size;
}
set->last_testpollfds = set->last_pollfds;
memcpy (set->testpollfds, set->pollfds,
sizeof (struct pollfd) * set->last_testpollfds);
g_mutex_unlock (set->poll_lock);
res = poll (set->testpollfds, set->last_testpollfds, timeout);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
}
return res;
}

View file

@ -1,68 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
*
* gstfdset.h: fdset datastructure
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_FDSET_H__
#define __GST_FDSET_H__
#include <gst/gst.h>
G_BEGIN_DECLS
typedef struct _GstFDSet GstFDSet;
typedef struct {
int fd;
gint idx;
} GstFD;
typedef enum {
GST_FDSET_MODE_SELECT,
GST_FDSET_MODE_POLL,
GST_FDSET_MODE_EPOLL
} GstFDSetMode;
#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
GType gst_fdset_mode_get_type (void);
GstFDSet* gst_fdset_new (GstFDSetMode mode);
void gst_fdset_free (GstFDSet *set);
void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode);
GstFDSetMode gst_fdset_get_mode (GstFDSet *set);
gboolean gst_fdset_add_fd (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_remove_fd (GstFDSet *set, GstFD *fd);
void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active);
void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active);
gboolean gst_fdset_fd_has_closed (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd);
gint gst_fdset_wait (GstFDSet *set, int timeout);
G_END_DECLS
#endif /* __GST_FDSET_H__ */

View file

@ -132,25 +132,6 @@
#define NOT_IMPLEMENTED 0
/* the select call is also performed on the control sockets, that way
* we can send special commands to unblock or restart the select call */
#define CONTROL_RESTART 'R' /* restart the select call */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(sink) sink->control_sock
#define WRITE_SOCKET(sink) sink->control_sock[1]
#define READ_SOCKET(sink) sink->control_sock[0]
#define SEND_COMMAND(sink, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(sink).fd, &c, 1); \
} G_STMT_END
#define READ_COMMAND(sink, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(sink).fd, &command, 1);\
} G_STMT_END
/* elementfactory information */
static const GstElementDetails gst_multi_fd_sink_details =
GST_ELEMENT_DETAILS ("Multi filedescriptor sink",
@ -188,7 +169,7 @@ enum
/* this is really arbitrarily chosen */
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE
#define DEFAULT_MODE GST_FDSET_MODE_POLL
#define DEFAULT_MODE GST_POLL_MODE_AUTO
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_TIME_MIN -1
@ -380,7 +361,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_MODE,
g_param_spec_enum ("mode", "Mode",
"The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
"The mode for selecting activity on the fds", GST_TYPE_POLL_MODE,
DEFAULT_MODE, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
@ -733,12 +714,12 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
/* set the socket to non blocking */
res = fcntl (fd, F_SETFL, O_NONBLOCK);
/* we always read from a client */
gst_fdset_add_fd (sink->fdset, &client->fd);
gst_poll_add_fd (sink->fdset, &client->fd);
/* we don't try to read from write only fds */
flags = fcntl (fd, F_GETFL, 0);
if ((flags & O_ACCMODE) != O_WRONLY) {
gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
}
/* figure out the mode, can't use send() for non sockets */
res = fstat (fd, &statbuf);
@ -746,7 +727,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
client->is_socket = TRUE;
}
SEND_COMMAND (sink, CONTROL_RESTART);
gst_poll_restart (sink->fdset);
CLIENTS_UNLOCK (sink);
@ -807,7 +788,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_fd_sink_remove_client_link (sink, clink);
SEND_COMMAND (sink, CONTROL_RESTART);
gst_poll_restart (sink->fdset);
} else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
}
@ -877,7 +858,7 @@ restart:
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_fd_sink_remove_client_link (sink, clients);
}
SEND_COMMAND (sink, CONTROL_RESTART);
gst_poll_restart (sink->fdset);
CLIENTS_UNLOCK (sink);
}
@ -1010,7 +991,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
break;
}
gst_fdset_remove_fd (sink->fdset, &client->fd);
gst_poll_remove_fd (sink->fdset, &client->fd);
g_get_current_time (&now);
client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
@ -1877,7 +1858,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
if (client->bufpos == -1) {
/* client is too fast, remove from write queue until new buffer is
* available */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
/* if we flushed out all of the client buffers, we can stop */
if (client->flushcount == 0)
goto flushed;
@ -1898,7 +1879,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
client->bufpos = position;
} else {
/* cannot send data to this client yet */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
return TRUE;
}
}
@ -2164,7 +2145,7 @@ restart:
} else if (client->bufpos == 0 || client->new_connection) {
/* can send data to this client now. need to signal the select thread that
* the fd_set changed */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE);
need_signal = TRUE;
}
/* keep track of maximum buffer usage */
@ -2241,7 +2222,7 @@ restart:
/* and send a signal to thread if fd_set changed */
if (need_signal) {
SEND_COMMAND (sink, CONTROL_RESTART);
gst_poll_restart (sink->fdset);
}
}
@ -2265,8 +2246,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
do {
gboolean stop = FALSE;
try_again = FALSE;
/* check for:
@ -2274,7 +2253,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
* - client socket input (ie, clients saying goodbye)
* - client socket output (ie, client reads) */
GST_LOG_OBJECT (sink, "waiting on action on fdset");
result = gst_fdset_wait (sink->fdset, -1);
result = gst_poll_wait (sink->fdset, GST_CLOCK_TIME_NONE);
/* < 0 is an error, 0 just means a timeout happened, which is impossible */
if (result < 0) {
@ -2317,8 +2296,11 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
* are not valid */
try_again = TRUE;
} else if (errno == EINTR) {
/* interrupted system call, just redo the select */
/* interrupted system call, just redo the wait */
try_again = TRUE;
} else if (errno == EBUSY) {
/* the call to gst_poll_wait() was flushed */
return;
} else {
/* this is quite bad... */
GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
@ -2327,46 +2309,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
}
} else {
GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
/* read all commands */
if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
GST_LOG_OBJECT (sink, "have a command");
while (TRUE) {
gchar command;
int res;
READ_COMMAND (sink, command, res);
if (res <= 0) {
GST_LOG_OBJECT (sink, "no more commands");
/* no more commands */
break;
}
switch (command) {
case CONTROL_RESTART:
GST_LOG_OBJECT (sink, "restart");
/* need to restart the select call as the fd_set changed */
/* if other file descriptors than the READ_SOCKET had activity,
* we don't restart just yet, but handle the other clients first
*/
if (result == 1)
try_again = TRUE;
break;
case CONTROL_STOP:
/* break out of the select loop */
GST_LOG_OBJECT (sink, "stop");
/* stop this function */
stop = TRUE;
break;
default:
GST_WARNING_OBJECT (sink, "unkown");
g_warning ("multifdsink: unknown control message received");
break;
}
}
}
}
if (stop) {
return;
}
} while (try_again);
@ -2396,25 +2338,25 @@ restart2:
continue;
}
if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
client->status = GST_CLIENT_STATUS_CLOSED;
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd.fd);
if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
client->status = GST_CLIENT_STATUS_ERROR;
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
/* handle client read */
if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
}
if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
/* handle client write */
if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
gst_multi_fd_sink_remove_client_link (sink, clients);
@ -2679,7 +2621,6 @@ static gboolean
gst_multi_fd_sink_start (GstBaseSink * bsink)
{
GstMultiFdSinkClass *fclass;
int control_socket[2];
GstMultiFdSink *this;
if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
@ -2689,20 +2630,9 @@ gst_multi_fd_sink_start (GstBaseSink * bsink)
fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
this->fdset = gst_fdset_new (this->mode);
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
if ((this->fdset = gst_poll_new (this->mode, TRUE)) == NULL)
goto socket_pair;
READ_SOCKET (this).fd = control_socket[0];
WRITE_SOCKET (this).fd = control_socket[1];
gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
this->streamheader = NULL;
this->bytes_to_serve = 0;
this->bytes_served = 0;
@ -2750,7 +2680,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink)
this->running = FALSE;
SEND_COMMAND (this, CONTROL_STOP);
gst_poll_set_flushing (this->fdset, TRUE);
if (this->thread) {
GST_DEBUG_OBJECT (this, "joining thread");
g_thread_join (this->thread);
@ -2761,9 +2691,6 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink)
/* free the clients */
gst_multi_fd_sink_clear (this);
close (READ_SOCKET (this).fd);
close (WRITE_SOCKET (this).fd);
if (this->streamheader) {
g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (this->streamheader);
@ -2774,8 +2701,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink)
fclass->close (this);
if (this->fdset) {
gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
gst_fdset_free (this->fdset);
gst_poll_free (this->fdset);
this->fdset = NULL;
}
g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this);

View file

@ -28,7 +28,6 @@
G_BEGIN_DECLS
#include "gsttcp.h"
#include "gstfdset.h"
#define GST_TYPE_MULTI_FD_SINK \
(gst_multi_fd_sink_get_type())
@ -140,7 +139,7 @@ typedef enum
/* structure for a client
*/
typedef struct {
GstFD fd;
GstPollFD fd;
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
@ -201,10 +200,8 @@ struct _GstMultiFdSink {
GHashTable *fd_hash; /* index on fd to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
GstFDSetMode mode;
GstFDSet *fdset;
GstFD control_sock[2];/* sockets for controlling the select call */
GstPollMode mode;
GstPoll *fdset;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
@ -259,7 +256,7 @@ struct _GstMultiFdSinkClass {
/* vtable */
gboolean (*init) (GstMultiFdSink *sink);
gboolean (*wait) (GstMultiFdSink *sink, GstFDSet *set);
gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set);
gboolean (*close) (GstMultiFdSink *sink);
void (*removed) (GstMultiFdSink *sink, int fd);

View file

@ -29,7 +29,6 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <string.h> /* memset, in FD_ZERO macro */
#include <unistd.h>
#include <sys/ioctl.h>
@ -128,30 +127,24 @@ gst_tcp_socket_write (int socket, const void *buf, size_t count)
*/
static GstFlowReturn
gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
int cancel_fd)
GstPoll * fdset)
{
fd_set testfds;
int maxfdp1;
ssize_t n;
size_t bytes_read;
int num_to_read;
int ret;
bytes_read = 0;
while (bytes_read < count) {
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (socket, &testfds);
if (cancel_fd >= 0)
FD_SET (cancel_fd, &testfds);
maxfdp1 = MAX (socket, cancel_fd) + 1;
/* no action (0) is an error too in our case */
if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
goto select_error;
if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
goto cancelled;
if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
if (ret == -1 && errno == EBUSY)
goto cancelled;
else
goto select_error;
}
/* ask how much is available for reading on the socket */
if (ioctl (socket, FIONREAD, &num_to_read) < 0)
@ -216,10 +209,12 @@ short_read:
/* close the socket and reset the fd. Used to clean up after errors. */
void
gst_tcp_socket_close (int *socket)
gst_tcp_socket_close (GstPollFD * socket)
{
close (*socket);
*socket = -1;
if (socket->fd >= 0) {
close (socket->fd);
socket->fd = -1;
}
}
/* read a buffer from the given socket
@ -229,30 +224,23 @@ gst_tcp_socket_close (int *socket)
* EOS
*/
GstFlowReturn
gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
GstBuffer ** buf)
{
fd_set testfds;
int ret;
int maxfdp1;
ssize_t bytes_read;
int readsize;
*buf = NULL;
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (socket, &testfds);
if (cancel_fd >= 0)
FD_SET (cancel_fd, &testfds);
maxfdp1 = MAX (socket, cancel_fd) + 1;
/* no action (0) is an error too in our case */
if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
goto select_error;
if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
goto cancelled;
if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
if (ret == -1 && errno == EBUSY)
goto cancelled;
else
goto select_error;
}
/* ask how much is available for reading on the socket */
if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
@ -326,7 +314,7 @@ short_read:
* EOS
*/
GstFlowReturn
gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
GstBuffer ** buf)
{
GstFlowReturn ret;
@ -338,8 +326,7 @@ gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
*buf = NULL;
header = g_malloc (GST_DP_HEADER_LENGTH);
ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
cancel_fd);
ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
if (ret != GST_FLOW_OK)
goto header_read_error;
@ -357,7 +344,7 @@ gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
g_free (header);
ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
GST_BUFFER_SIZE (*buf), cancel_fd);
GST_BUFFER_SIZE (*buf), fdset);
if (ret != GST_FLOW_OK)
goto data_read_error;
@ -394,7 +381,7 @@ data_read_error:
}
GstFlowReturn
gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset,
GstCaps ** caps)
{
GstFlowReturn ret;
@ -408,8 +395,7 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
*caps = NULL;
header = g_malloc (GST_DP_HEADER_LENGTH);
ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
cancel_fd);
ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
if (ret != GST_FLOW_OK)
goto header_read_error;
@ -429,7 +415,7 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
"Reading %" G_GSIZE_FORMAT " bytes for caps packet payload",
payload_length);
ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset);
if (ret != GST_FLOW_OK)
goto payload_read_error;

View file

@ -44,14 +44,14 @@ gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
void gst_tcp_socket_close (int *socket);
void gst_tcp_socket_close (GstPollFD *socket);
GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps);
GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps **caps);
GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd);
GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, GstPoll * fdset);
gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);

View file

@ -160,7 +160,7 @@ gst_tcp_client_sink_init (GstTCPClientSink * this)
this->host = g_strdup (TCP_DEFAULT_HOST);
this->port = TCP_DEFAULT_PORT;
this->sock_fd = -1;
this->sock_fd.fd = -1;
this->protocol = GST_TCP_PROTOCOL_NONE;
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
}
@ -198,8 +198,8 @@ gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
g_free (string);
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
TRUE, sink->host, sink->port))
if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd.fd,
caps, TRUE, sink->host, sink->port))
goto gdp_write_error;
sink->caps_sent = TRUE;
@ -241,7 +241,7 @@ gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
break;
case GST_TCP_PROTOCOL_GDP:
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf,
if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd.fd, buf,
TRUE, sink->host, sink->port))
goto gdp_write_error;
break;
@ -250,7 +250,7 @@ gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
}
/* write buffer data */
wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size);
wrote = gst_tcp_socket_write (sink->sock_fd.fd, GST_BUFFER_DATA (buf), size);
if (wrote < size)
goto write_error;
@ -348,12 +348,12 @@ gst_tcp_client_sink_start (GstTCPClientSink * this)
/* create sending client socket */
GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
this->port);
if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
if ((this->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d",
this->sock_fd);
this->sock_fd.fd);
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
@ -371,7 +371,7 @@ gst_tcp_client_sink_start (GstTCPClientSink * this)
g_free (ip);
GST_DEBUG_OBJECT (this, "connecting to server");
ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
ret = connect (this->sock_fd.fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
if (ret) {
@ -405,10 +405,7 @@ gst_tcp_client_sink_stop (GstTCPClientSink * this)
if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
return TRUE;
if (this->sock_fd != -1) {
close (this->sock_fd);
this->sock_fd = -1;
}
gst_tcp_socket_close (&this->sock_fd);
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);

View file

@ -73,7 +73,7 @@ struct _GstTCPClientSink {
struct sockaddr_in server_sin;
/* socket */
int sock_fd;
GstPollFD sock_fd;
size_t data_written; /* how much bytes have we written ? */
GstTCPProtocol protocol; /* used with the protocol enum */

View file

@ -32,24 +32,6 @@
#include <fcntl.h>
/* control stuff stolen from fdsrc */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(o) o->control_fds
#define WRITE_SOCKET(o) o->control_fds[1]
#define READ_SOCKET(o) o->control_fds[0]
#define SEND_COMMAND(o, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(o), &c, 1); \
} G_STMT_END
#define READ_COMMAND(o, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(o), &command, 1); \
} G_STMT_END
GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
#define GST_CAT_DEFAULT tcpclientsrc_debug
@ -150,13 +132,10 @@ gst_tcp_client_src_init (GstTCPClientSrc * this, GstTCPClientSrcClass * g_class)
{
this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
this->sock_fd = -1;
this->sock_fd.fd = -1;
this->protocol = GST_TCP_PROTOCOL_NONE;
this->caps = NULL;
READ_SOCKET (this) = -1;
WRITE_SOCKET (this) = -1;
gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
@ -207,8 +186,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
case GST_TCP_PROTOCOL_NONE:
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd,
READ_SOCKET (src), outbuf);
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
src->fdset, outbuf);
break;
case GST_TCP_PROTOCOL_GDP:
@ -217,8 +196,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GstCaps *caps;
GST_DEBUG_OBJECT (src, "getting caps through GDP");
ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd,
READ_SOCKET (src), &caps);
ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd,
src->fdset, &caps);
if (ret != GST_FLOW_OK)
goto no_caps;
@ -227,8 +206,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
src->caps = caps;
}
ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd,
READ_SOCKET (src), outbuf);
ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
src->fdset, outbuf);
break;
default:
/* need to assert as buf == NULL */
@ -323,22 +302,18 @@ gst_tcp_client_src_start (GstBaseSrc * bsrc)
gchar *ip;
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
/* create the control sockets before anything */
if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto socket_pair;
fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
/* create receiving client socket */
GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
src->host, src->port);
if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
goto no_socket;
GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
src->sock_fd);
src->sock_fd.fd);
GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
/* look up name if we need to */
@ -355,7 +330,7 @@ gst_tcp_client_src_start (GstBaseSrc * bsrc)
g_free (ip);
GST_DEBUG_OBJECT (src, "connecting to server");
ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin,
ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin));
if (ret) {
@ -406,10 +381,13 @@ gst_tcp_client_src_stop (GstBaseSrc * bsrc)
src = GST_TCP_CLIENT_SRC (bsrc);
GST_DEBUG_OBJECT (src, "closing socket");
if (src->sock_fd != -1) {
close (src->sock_fd);
src->sock_fd = -1;
if (src->fdset != NULL) {
gst_poll_free (src->fdset);
src->fdset = NULL;
}
gst_tcp_socket_close (&src->sock_fd);
src->caps_received = FALSE;
if (src->caps) {
gst_caps_unref (src->caps);
@ -417,11 +395,6 @@ gst_tcp_client_src_stop (GstBaseSrc * bsrc)
}
GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
close (READ_SOCKET (src));
close (WRITE_SOCKET (src));
READ_SOCKET (src) = -1;
WRITE_SOCKET (src) = -1;
return TRUE;
}
@ -431,7 +404,7 @@ gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
{
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
SEND_COMMAND (src, CONTROL_STOP);
gst_poll_set_flushing (src->fdset, TRUE);
return TRUE;
}

View file

@ -64,8 +64,8 @@ struct _GstTCPClientSrc {
struct sockaddr_in server_sin;
/* socket */
int sock_fd;
int control_fds[2];
GstPollFD sock_fd;
GstPoll *fdset;
GstTCPProtocol protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */

View file

@ -63,7 +63,7 @@ enum
static void gst_tcp_server_sink_finalize (GObject * gobject);
static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink,
GstFDSet * set);
GstPoll * set);
static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this);
static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this);
static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd);
@ -184,11 +184,11 @@ gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd)
}
static gboolean
gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstFDSet * set)
gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set)
{
GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink);
if (gst_fdset_fd_can_read (set, &this->server_sock)) {
if (gst_poll_fd_can_read (set, &this->server_sock)) {
/* handle new client connection on server socket */
if (!gst_tcp_server_sink_handle_server_read (this)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
@ -270,7 +270,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
ret = 1;
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR,
(void *) &ret, sizeof (ret)) < 0) {
gst_tcp_socket_close (&this->server_sock.fd);
gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
@ -279,7 +279,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
ret = 1;
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE,
(void *) &ret, sizeof (ret)) < 0) {
gst_tcp_socket_close (&this->server_sock.fd);
gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
@ -297,7 +297,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
sizeof (this->server_sin));
if (ret) {
gst_tcp_socket_close (&this->server_sock.fd);
gst_tcp_socket_close (&this->server_sock);
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
@ -314,7 +314,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock.fd, TCP_BACKLOG);
if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) {
gst_tcp_socket_close (&this->server_sock.fd);
gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
@ -323,8 +323,8 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
"listened on server socket %d, returning from connection setup",
this->server_sock.fd);
gst_fdset_add_fd (parent->fdset, &this->server_sock);
gst_fdset_fd_ctl_read (parent->fdset, &this->server_sock, TRUE);
gst_poll_add_fd (parent->fdset, &this->server_sock);
gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE);
return TRUE;
}
@ -335,7 +335,7 @@ gst_tcp_server_sink_close (GstMultiFdSink * parent)
GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);
if (this->server_sock.fd != -1) {
gst_fdset_remove_fd (parent->fdset, &this->server_sock);
gst_poll_remove_fd (parent->fdset, &this->server_sock);
close (this->server_sock.fd);
this->server_sock.fd = -1;

View file

@ -76,7 +76,7 @@ struct _GstTCPServerSink {
struct sockaddr_in server_sin;
/* socket */
GstFD server_sock;
GstPollFD server_sock;
};
struct _GstTCPServerSinkClass {

View file

@ -32,24 +32,6 @@
#include <fcntl.h>
/* control stuff stolen from fdsrc */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(o) o->control_fds
#define WRITE_SOCKET(o) o->control_fds[1]
#define READ_SOCKET(o) o->control_fds[0]
#define SEND_COMMAND(o, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(o), &c, 1); \
} G_STMT_END
#define READ_COMMAND(o, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(o), &command, 1); \
} G_STMT_END
GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
#define GST_CAT_DEFAULT tcpserversrc_debug
@ -147,13 +129,10 @@ gst_tcp_server_src_init (GstTCPServerSrc * src, GstTCPServerSrcClass * g_class)
{
src->server_port = TCP_DEFAULT_PORT;
src->host = g_strdup (TCP_DEFAULT_HOST);
src->server_sock_fd = -1;
src->client_sock_fd = -1;
src->server_sock_fd.fd = -1;
src->client_sock_fd.fd = -1;
src->protocol = GST_TCP_PROTOCOL_NONE;
READ_SOCKET (src) = -1;
WRITE_SOCKET (src) = -1;
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
}
@ -172,8 +151,6 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPServerSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
fd_set testfds;
int maxfdp1;
src = GST_TCP_SERVER_SRC (psrc);
@ -181,36 +158,33 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
goto wrong_state;
restart:
/* do a blocking select on the socket */
FD_ZERO (&testfds);
/* always select on cancel socket */
FD_SET (READ_SOCKET (src), &testfds);
if (src->client_sock_fd >= 0) {
if (src->client_sock_fd.fd >= 0) {
/* if we have a client, wait for read */
FD_SET (src->client_sock_fd, &testfds);
maxfdp1 = MAX (src->client_sock_fd, READ_SOCKET (src)) + 1;
gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE);
gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE);
} else {
/* else wait on server socket for connections */
FD_SET (src->server_sock_fd, &testfds);
maxfdp1 = MAX (src->server_sock_fd, READ_SOCKET (src)) + 1;
gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE);
}
/* no action (0) is an error too in our case */
if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
goto select_error;
if (FD_ISSET (READ_SOCKET (src), &testfds))
goto select_cancelled;
if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) {
if (ret == -1 && errno == EBUSY)
goto select_cancelled;
else
goto select_error;
}
/* if we have no client socket we can accept one now */
if (src->client_sock_fd < 0) {
if (FD_ISSET (src->server_sock_fd, &testfds)) {
if ((src->client_sock_fd =
accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
if (src->client_sock_fd.fd < 0) {
if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) {
if ((src->client_sock_fd.fd =
accept (src->server_sock_fd.fd,
(struct sockaddr *) &src->client_sin,
&src->client_sin_len)) == -1)
goto accept_error;
gst_poll_add_fd (src->fdset, &src->client_sock_fd);
}
/* and restart now to poll the socket. */
goto restart;
@ -220,8 +194,8 @@ restart:
switch (src->protocol) {
case GST_TCP_PROTOCOL_NONE:
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
READ_SOCKET (src), outbuf);
ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
src->fdset, outbuf);
break;
case GST_TCP_PROTOCOL_GDP:
@ -229,8 +203,8 @@ restart:
GstCaps *caps;
gchar *string;
ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd,
READ_SOCKET (src), &caps);
ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd.fd,
src->fdset, &caps);
if (ret == GST_FLOW_WRONG_STATE)
goto gdp_cancelled;
@ -246,8 +220,8 @@ restart:
gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
}
ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
READ_SOCKET (src), outbuf);
ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
src->fdset, outbuf);
if (ret == GST_FLOW_OK)
gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
@ -369,26 +343,19 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc)
int ret;
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
/* create the control sockets before anything */
if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
goto socket_pair;
fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
/* reset caps_received flag */
src->caps_received = FALSE;
/* create the server listener socket */
if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
goto socket_error;
GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
src->server_sock_fd);
src->server_sock_fd.fd);
/* make address reusable */
ret = 1;
if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0)
goto sock_opt;
@ -408,16 +375,22 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc)
/* bind it */
GST_DEBUG_OBJECT (src, "binding server socket to address");
if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin))) < 0)
goto bind_error;
GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
src->server_sock_fd, TCP_BACKLOG);
src->server_sock_fd.fd, TCP_BACKLOG);
if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1)
goto listen_error;
/* create an fdset to keep track of our file descriptors */
if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto socket_pair;
gst_poll_add_fd (src->fdset, &src->server_sock_fd);
GST_DEBUG_OBJECT (src, "received client");
GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
@ -425,12 +398,6 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc)
return TRUE;
/* ERRORS */
socket_pair:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
return FALSE;
}
socket_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
@ -440,6 +407,7 @@ sock_opt:
{
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
gst_tcp_socket_close (&src->server_sock_fd);
return FALSE;
}
host_error:
@ -465,6 +433,13 @@ listen_error:
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
socket_pair:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
gst_tcp_socket_close (&src->server_sock_fd);
return FALSE;
}
}
static gboolean
@ -472,20 +447,13 @@ gst_tcp_server_src_stop (GstBaseSrc * bsrc)
{
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
if (src->server_sock_fd != -1) {
close (src->server_sock_fd);
src->server_sock_fd = -1;
}
if (src->client_sock_fd != -1) {
close (src->client_sock_fd);
src->client_sock_fd = -1;
}
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
gst_poll_free (src->fdset);
src->fdset = NULL;
close (READ_SOCKET (src));
close (WRITE_SOCKET (src));
READ_SOCKET (src) = -1;
WRITE_SOCKET (src) = -1;
gst_tcp_socket_close (&src->server_sock_fd);
gst_tcp_socket_close (&src->client_sock_fd);
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
return TRUE;
}
@ -496,7 +464,7 @@ gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
{
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
SEND_COMMAND (src, CONTROL_STOP);
gst_poll_set_flushing (src->fdset, TRUE);
return TRUE;
}

View file

@ -65,14 +65,14 @@ struct _GstTCPServerSrc {
int server_port;
gchar *host;
struct sockaddr_in server_sin;
int server_sock_fd;
GstPollFD server_sock_fd;
/* client information */
struct sockaddr_in client_sin;
socklen_t client_sin_len;
int client_sock_fd;
GstPollFD client_sock_fd;
int control_fds[2];
GstPoll *fdset;
GstTCPProtocol protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */