From d1931e499d9e8a2a4835a9e0e052d6371341d58a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sun, 27 Jun 2004 11:15:23 +0000 Subject: [PATCH] gst/tcp/: More multifdsink fixes, more recovery policy fixes. Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type), (gst_multifdsink_class_init), (gst_multifdsink_add), (gst_multifdsink_remove), (gst_multifdsink_clear), (gst_multifdsink_client_remove), (gst_multifdsink_handle_client_read), (gst_multifdsink_client_queue_data), (gst_multifdsink_client_queue_caps), (gst_multifdsink_client_queue_buffer), (gst_multifdsink_handle_client_write), (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_thread), (gst_multifdsink_init_send), (gst_multifdsink_close): * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_handle_server_read), (gst_tcpserversink_handle_select), (gst_tcpserversink_close): More multifdsink fixes, more recovery policy fixes. Removed stupid g_print --- ChangeLog | 21 +++++++ gst/tcp/gstmultifdsink.c | 122 +++++++++++++++++++++++++++++++++---- gst/tcp/gstmultifdsink.h | 12 ++++ gst/tcp/gsttcpserversink.c | 21 +------ 4 files changed, 145 insertions(+), 31 deletions(-) diff --git a/ChangeLog b/ChangeLog index 99f38cac1d..fcda8ec79c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,24 @@ +2004-06-27 Wim Taymans + + * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type), + (gst_multifdsink_class_init), (gst_multifdsink_add), + (gst_multifdsink_remove), (gst_multifdsink_clear), + (gst_multifdsink_client_remove), + (gst_multifdsink_handle_client_read), + (gst_multifdsink_client_queue_data), + (gst_multifdsink_client_queue_caps), + (gst_multifdsink_client_queue_buffer), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), + (gst_multifdsink_handle_clients), (gst_multifdsink_thread), + (gst_multifdsink_init_send), (gst_multifdsink_close): + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcpserversink.c: + (gst_tcpserversink_handle_server_read), + (gst_tcpserversink_handle_select), (gst_tcpserversink_close): + More multifdsink fixes, more recovery policy fixes. + Removed stupid g_print + 2004-06-26 Wim Taymans * gst/tcp/Makefile.am: diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index f65803a3b2..ab4726c0a5 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -61,6 +61,11 @@ GST_DEBUG_CATEGORY (multifdsink_debug); /* MultiFdSink signals and args */ enum { + /* methods */ + SIGNAL_ADD, + SIGNAL_REMOVE, + SIGNAL_CLEAR, + /* signals */ SIGNAL_CLIENT_ADDED, SIGNAL_CLIENT_REMOVED, LAST_SIGNAL @@ -105,9 +110,12 @@ gst_recover_policy_get_type (void) } static void gst_multifdsink_base_init (gpointer g_class); -static void gst_multifdsink_class_init (GstMultiFdSink * klass); +static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass); static void gst_multifdsink_init (GstMultiFdSink * multifdsink); +static void gst_multifdsink_client_remove (GstMultiFdSink * sink, + GstTCPClient * client); + static void gst_multifdsink_chain (GstPad * pad, GstData * _data); static GstElementStateReturn gst_multifdsink_change_state (GstElement * element); @@ -158,7 +166,7 @@ gst_multifdsink_base_init (gpointer g_class) } static void -gst_multifdsink_class_init (GstMultiFdSink * klass) +gst_multifdsink_class_init (GstMultiFdSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; @@ -189,6 +197,18 @@ gst_multifdsink_class_init (GstMultiFdSink * klass) "How to recover when client reaches the soft max", GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE)); + gst_multifdsink_signals[SIGNAL_ADD] = + g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstMultiFdSinkClass, add), + NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + gst_multifdsink_signals[SIGNAL_REMOVE] = + g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstMultiFdSinkClass, remove), + NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); + gst_multifdsink_signals[SIGNAL_CLEAR] = + g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstMultiFdSinkClass, clear), + NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0); gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] = g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), @@ -206,6 +226,10 @@ gst_multifdsink_class_init (GstMultiFdSink * klass) gstelement_class->change_state = gst_multifdsink_change_state; + klass->add = gst_multifdsink_add; + klass->remove = gst_multifdsink_remove; + klass->clear = gst_multifdsink_clear; + GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink"); } @@ -243,6 +267,67 @@ gst_multifdsink_debug_fdset (GstMultiFdSink * sink, fd_set * testfds) } } +void +gst_multifdsink_add (GstMultiFdSink * sink, int fd) +{ + GstTCPClient *client; + + /* create client datastructure */ + client = g_new0 (GstTCPClient, 1); + client->fd = fd; + client->bufpos = -1; + client->bufoffset = 0; + client->sending = NULL; + + g_mutex_lock (sink->clientslock); + sink->clients = g_list_prepend (sink->clients, client); + g_mutex_unlock (sink->clientslock); + + /* we always read from a client */ + FD_SET (fd, &sink->readfds); + + /* set the socket to non blocking */ + fcntl (fd, F_SETFL, O_NONBLOCK); + + g_signal_emit (G_OBJECT (sink), + gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, NULL, fd); +} + +void +gst_multifdsink_remove (GstMultiFdSink * sink, int fd) +{ + GList *clients; + + g_mutex_lock (sink->clientslock); + /* loop over the clients to find the one with the fd */ + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) clients->data; + + if (client->fd == fd) { + gst_multifdsink_client_remove (sink, client); + break; + } + } + g_mutex_unlock (sink->clientslock); +} + +void +gst_multifdsink_clear (GstMultiFdSink * sink) +{ + GList *clients; + + g_mutex_lock (sink->clientslock); + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) clients->data; + gst_multifdsink_client_remove (sink, client); + } + g_mutex_unlock (sink->clientslock); +} + static void gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) { @@ -440,11 +525,10 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, * another thread */ buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); client->bufpos--; - gst_buffer_ref (buf); + /* queueing a buffer will ref it */ gst_multifdsink_client_queue_buffer (sink, client, buf); - /* it is safe to unref now as queueing a buffer will ref it */ - gst_buffer_unref (buf); + /* need to start from the first byte for this new buffer */ client->bufoffset = 0; } @@ -492,9 +576,15 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, return TRUE; } -static void +/* calculate the new position for a client after recovery. This function + * does not update the client position but merely returns the required + * position. + */ +static gint gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) { + gint newbufpos; + /* FIXME: implement recover procedure here, like moving the position to * the next keyframe, dropping buffers back to the beginning of the queue, * stuff like that... */ @@ -504,20 +594,25 @@ gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) case GST_RECOVER_POLICY_NONE: /* do nothing, client will catch up or get kicked out when it reaches * the hard max */ + newbufpos = client->bufpos; break; case GST_RECOVER_POLICY_RESYNC_START: /* move to beginning of queue */ - client->bufpos = -1; + newbufpos = -1; break; case GST_RECOVER_POLICY_RESYNC_SOFT: /* move to beginning of soft max */ - client->bufpos = sink->buffers_soft_max; + newbufpos = sink->buffers_soft_max; break; case GST_RECOVER_POLICY_RESYNC_KEYFRAME: /* FIXME, find keyframe in buffers */ - client->bufpos = sink->buffers_soft_max; + newbufpos = sink->buffers_soft_max; + break; + default: + newbufpos = sink->buffers_soft_max; break; } + return newbufpos; } /* Queue a buffer on the global queue. @@ -566,7 +661,13 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) client, client->fd, client->bufpos); /* check soft max if needed, recover client */ if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) { - gst_multifdsink_recover_client (sink, client); + gint newpos; + + newpos = gst_multifdsink_recover_client (sink, client); + if (newpos != client->bufpos) { + client->bufpos = newpos; + client->discont = TRUE; + } } /* check hard max, remove client */ if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) { @@ -615,7 +716,6 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) gst_buffer_unref (old); } sink->buffers_queued = max_buffer_usage; - g_print ("%d\n", max_buffer_usage); g_mutex_unlock (sink->clientslock); /* and send a signal to thread if fd_set changed */ diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 5eeecfe97f..4746a315f7 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -84,6 +84,8 @@ typedef struct { GList *sending; /* the buffers we need to send */ gint bufoffset; /* offset in the first buffer */ + gboolean discont; + GstTCPProtocolType protocol; gboolean caps_sent; @@ -125,6 +127,12 @@ struct _GstMultiFdSink { struct _GstMultiFdSinkClass { GstElementClass parent_class; + /* element methods */ + void (*add) (GstMultiFdSink *sink, int fd); + void (*remove) (GstMultiFdSink *sink, int fd); + void (*clear) (GstMultiFdSink *sink); + + /* vtable */ gboolean (*init) (GstMultiFdSink *sink); gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds); gboolean (*close) (GstMultiFdSink *sink); @@ -136,6 +144,10 @@ struct _GstMultiFdSinkClass { GType gst_multifdsink_get_type (void); +void gst_multifdsink_add (GstMultiFdSink *sink, int fd); +void gst_multifdsink_remove (GstMultiFdSink *sink, int fd); +void gst_multifdsink_clear (GstMultiFdSink *sink); + #ifdef __cplusplus } diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index 591f953318..b4499836cf 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -70,8 +70,6 @@ static void gst_tcpserversink_get_property (GObject * object, guint prop_id, static GstMultiFdSinkClass *parent_class = NULL; -//static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 }; - GType gst_tcpserversink_get_type (void) { @@ -156,8 +154,6 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) int client_sock_fd; struct sockaddr_in client_address; int client_address_len; - GstTCPClient *client; - GstMultiFdSink *parent = GST_MULTIFDSINK (sink); client_sock_fd = accept (sink->server_sock_fd, (struct sockaddr *) &client_address, @@ -168,22 +164,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) return FALSE; } - /* create client datastructure */ - client = g_new0 (GstTCPClient, 1); - client->fd = client_sock_fd; - client->bufpos = -1; - client->bufoffset = 0; - client->sending = NULL; - - g_mutex_lock (parent->clientslock); - parent->clients = g_list_prepend (parent->clients, client); - g_mutex_unlock (parent->clientslock); - - /* we always read from a client */ - FD_SET (client_sock_fd, &parent->readfds); - - /* set the socket to non blocking */ - fcntl (client_sock_fd, F_SETFL, O_NONBLOCK); + gst_multifdsink_add (GST_MULTIFDSINK (sink), client_sock_fd); GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", inet_ntoa (client_address.sin_addr), client_sock_fd);