gst/tcp/: More multifdsink fixes, more recovery policy fixes.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_caps),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_thread),
(gst_multifdsink_init_send), (gst_multifdsink_close):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcpserversink.c:
(gst_tcpserversink_handle_server_read),
(gst_tcpserversink_handle_select), (gst_tcpserversink_close):
More multifdsink fixes, more recovery policy fixes.
Removed stupid g_print
This commit is contained in:
Wim Taymans 2004-06-27 11:15:23 +00:00
parent 2c2b65c4b0
commit d1931e499d
4 changed files with 145 additions and 31 deletions

View file

@ -1,3 +1,24 @@
2004-06-27 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
(gst_multifdsink_class_init), (gst_multifdsink_add),
(gst_multifdsink_remove), (gst_multifdsink_clear),
(gst_multifdsink_client_remove),
(gst_multifdsink_handle_client_read),
(gst_multifdsink_client_queue_data),
(gst_multifdsink_client_queue_caps),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write),
(gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
(gst_multifdsink_handle_clients), (gst_multifdsink_thread),
(gst_multifdsink_init_send), (gst_multifdsink_close):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcpserversink.c:
(gst_tcpserversink_handle_server_read),
(gst_tcpserversink_handle_select), (gst_tcpserversink_close):
More multifdsink fixes, more recovery policy fixes.
Removed stupid g_print
2004-06-26 Wim Taymans <wim@fluendo.com> 2004-06-26 Wim Taymans <wim@fluendo.com>
* gst/tcp/Makefile.am: * gst/tcp/Makefile.am:

View file

@ -61,6 +61,11 @@ GST_DEBUG_CATEGORY (multifdsink_debug);
/* MultiFdSink signals and args */ /* MultiFdSink signals and args */
enum enum
{ {
/* methods */
SIGNAL_ADD,
SIGNAL_REMOVE,
SIGNAL_CLEAR,
/* signals */
SIGNAL_CLIENT_ADDED, SIGNAL_CLIENT_ADDED,
SIGNAL_CLIENT_REMOVED, SIGNAL_CLIENT_REMOVED,
LAST_SIGNAL LAST_SIGNAL
@ -105,9 +110,12 @@ gst_recover_policy_get_type (void)
} }
static void gst_multifdsink_base_init (gpointer g_class); static void gst_multifdsink_base_init (gpointer g_class);
static void gst_multifdsink_class_init (GstMultiFdSink * klass); static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
static void gst_multifdsink_init (GstMultiFdSink * multifdsink); static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
static void gst_multifdsink_client_remove (GstMultiFdSink * sink,
GstTCPClient * client);
static void gst_multifdsink_chain (GstPad * pad, GstData * _data); static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_multifdsink_change_state (GstElement * static GstElementStateReturn gst_multifdsink_change_state (GstElement *
element); element);
@ -158,7 +166,7 @@ gst_multifdsink_base_init (gpointer g_class)
} }
static void static void
gst_multifdsink_class_init (GstMultiFdSink * klass) gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
{ {
GObjectClass *gobject_class; GObjectClass *gobject_class;
GstElementClass *gstelement_class; GstElementClass *gstelement_class;
@ -189,6 +197,18 @@ gst_multifdsink_class_init (GstMultiFdSink * klass)
"How to recover when client reaches the soft max", "How to recover when client reaches the soft max",
GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE)); GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE));
gst_multifdsink_signals[SIGNAL_ADD] =
g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
gst_multifdsink_signals[SIGNAL_REMOVE] =
g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
gst_multifdsink_signals[SIGNAL_CLEAR] =
g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0);
gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] = gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
@ -206,6 +226,10 @@ gst_multifdsink_class_init (GstMultiFdSink * klass)
gstelement_class->change_state = gst_multifdsink_change_state; gstelement_class->change_state = gst_multifdsink_change_state;
klass->add = gst_multifdsink_add;
klass->remove = gst_multifdsink_remove;
klass->clear = gst_multifdsink_clear;
GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink"); GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
} }
@ -243,6 +267,67 @@ gst_multifdsink_debug_fdset (GstMultiFdSink * sink, fd_set * testfds)
} }
} }
void
gst_multifdsink_add (GstMultiFdSink * sink, int fd)
{
GstTCPClient *client;
/* create client datastructure */
client = g_new0 (GstTCPClient, 1);
client->fd = fd;
client->bufpos = -1;
client->bufoffset = 0;
client->sending = NULL;
g_mutex_lock (sink->clientslock);
sink->clients = g_list_prepend (sink->clients, client);
g_mutex_unlock (sink->clientslock);
/* we always read from a client */
FD_SET (fd, &sink->readfds);
/* set the socket to non blocking */
fcntl (fd, F_SETFL, O_NONBLOCK);
g_signal_emit (G_OBJECT (sink),
gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, NULL, fd);
}
void
gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
{
GList *clients;
g_mutex_lock (sink->clientslock);
/* loop over the clients to find the one with the fd */
for (clients = sink->clients; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
if (client->fd == fd) {
gst_multifdsink_client_remove (sink, client);
break;
}
}
g_mutex_unlock (sink->clientslock);
}
void
gst_multifdsink_clear (GstMultiFdSink * sink)
{
GList *clients;
g_mutex_lock (sink->clientslock);
for (clients = sink->clients; clients; clients = g_list_next (clients)) {
GstTCPClient *client;
client = (GstTCPClient *) clients->data;
gst_multifdsink_client_remove (sink, client);
}
g_mutex_unlock (sink->clientslock);
}
static void static void
gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
{ {
@ -440,11 +525,10 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
* another thread */ * another thread */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--; client->bufpos--;
gst_buffer_ref (buf);
/* queueing a buffer will ref it */
gst_multifdsink_client_queue_buffer (sink, client, buf); gst_multifdsink_client_queue_buffer (sink, client, buf);
/* it is safe to unref now as queueing a buffer will ref it */
gst_buffer_unref (buf);
/* need to start from the first byte for this new buffer */ /* need to start from the first byte for this new buffer */
client->bufoffset = 0; client->bufoffset = 0;
} }
@ -492,9 +576,15 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
return TRUE; return TRUE;
} }
static void /* calculate the new position for a client after recovery. This function
* does not update the client position but merely returns the required
* position.
*/
static gint
gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
{ {
gint newbufpos;
/* FIXME: implement recover procedure here, like moving the position to /* FIXME: implement recover procedure here, like moving the position to
* the next keyframe, dropping buffers back to the beginning of the queue, * the next keyframe, dropping buffers back to the beginning of the queue,
* stuff like that... */ * stuff like that... */
@ -504,20 +594,25 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
case GST_RECOVER_POLICY_NONE: case GST_RECOVER_POLICY_NONE:
/* do nothing, client will catch up or get kicked out when it reaches /* do nothing, client will catch up or get kicked out when it reaches
* the hard max */ * the hard max */
newbufpos = client->bufpos;
break; break;
case GST_RECOVER_POLICY_RESYNC_START: case GST_RECOVER_POLICY_RESYNC_START:
/* move to beginning of queue */ /* move to beginning of queue */
client->bufpos = -1; newbufpos = -1;
break; break;
case GST_RECOVER_POLICY_RESYNC_SOFT: case GST_RECOVER_POLICY_RESYNC_SOFT:
/* move to beginning of soft max */ /* move to beginning of soft max */
client->bufpos = sink->buffers_soft_max; newbufpos = sink->buffers_soft_max;
break; break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME: case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* FIXME, find keyframe in buffers */ /* FIXME, find keyframe in buffers */
client->bufpos = sink->buffers_soft_max; newbufpos = sink->buffers_soft_max;
break;
default:
newbufpos = sink->buffers_soft_max;
break; break;
} }
return newbufpos;
} }
/* Queue a buffer on the global queue. /* Queue a buffer on the global queue.
@ -566,7 +661,13 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
client, client->fd, client->bufpos); client, client->fd, client->bufpos);
/* check soft max if needed, recover client */ /* check soft max if needed, recover client */
if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) { if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) {
gst_multifdsink_recover_client (sink, client); gint newpos;
newpos = gst_multifdsink_recover_client (sink, client);
if (newpos != client->bufpos) {
client->bufpos = newpos;
client->discont = TRUE;
}
} }
/* check hard max, remove client */ /* check hard max, remove client */
if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) { if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) {
@ -615,7 +716,6 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf)
gst_buffer_unref (old); gst_buffer_unref (old);
} }
sink->buffers_queued = max_buffer_usage; sink->buffers_queued = max_buffer_usage;
g_print ("%d\n", max_buffer_usage);
g_mutex_unlock (sink->clientslock); g_mutex_unlock (sink->clientslock);
/* and send a signal to thread if fd_set changed */ /* and send a signal to thread if fd_set changed */

View file

@ -84,6 +84,8 @@ typedef struct {
GList *sending; /* the buffers we need to send */ GList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */ gint bufoffset; /* offset in the first buffer */
gboolean discont;
GstTCPProtocolType protocol; GstTCPProtocolType protocol;
gboolean caps_sent; gboolean caps_sent;
@ -125,6 +127,12 @@ struct _GstMultiFdSink {
struct _GstMultiFdSinkClass { struct _GstMultiFdSinkClass {
GstElementClass parent_class; GstElementClass parent_class;
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
void (*remove) (GstMultiFdSink *sink, int fd);
void (*clear) (GstMultiFdSink *sink);
/* vtable */
gboolean (*init) (GstMultiFdSink *sink); gboolean (*init) (GstMultiFdSink *sink);
gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds); gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds);
gboolean (*close) (GstMultiFdSink *sink); gboolean (*close) (GstMultiFdSink *sink);
@ -136,6 +144,10 @@ struct _GstMultiFdSinkClass {
GType gst_multifdsink_get_type (void); GType gst_multifdsink_get_type (void);
void gst_multifdsink_add (GstMultiFdSink *sink, int fd);
void gst_multifdsink_remove (GstMultiFdSink *sink, int fd);
void gst_multifdsink_clear (GstMultiFdSink *sink);
#ifdef __cplusplus #ifdef __cplusplus
} }

View file

@ -70,8 +70,6 @@ static void gst_tcpserversink_get_property (GObject * object, guint prop_id,
static GstMultiFdSinkClass *parent_class = NULL; static GstMultiFdSinkClass *parent_class = NULL;
//static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 };
GType GType
gst_tcpserversink_get_type (void) gst_tcpserversink_get_type (void)
{ {
@ -156,8 +154,6 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
int client_sock_fd; int client_sock_fd;
struct sockaddr_in client_address; struct sockaddr_in client_address;
int client_address_len; int client_address_len;
GstTCPClient *client;
GstMultiFdSink *parent = GST_MULTIFDSINK (sink);
client_sock_fd = client_sock_fd =
accept (sink->server_sock_fd, (struct sockaddr *) &client_address, accept (sink->server_sock_fd, (struct sockaddr *) &client_address,
@ -168,22 +164,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
return FALSE; return FALSE;
} }
/* create client datastructure */ gst_multifdsink_add (GST_MULTIFDSINK (sink), client_sock_fd);
client = g_new0 (GstTCPClient, 1);
client->fd = client_sock_fd;
client->bufpos = -1;
client->bufoffset = 0;
client->sending = NULL;
g_mutex_lock (parent->clientslock);
parent->clients = g_list_prepend (parent->clients, client);
g_mutex_unlock (parent->clientslock);
/* we always read from a client */
FD_SET (client_sock_fd, &parent->readfds);
/* set the socket to non blocking */
fcntl (client_sock_fd, F_SETFL, O_NONBLOCK);
GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d",
inet_ntoa (client_address.sin_addr), client_sock_fd); inet_ntoa (client_address.sin_addr), client_sock_fd);