gst/tcp/gstmultifdsink.*: Add support for remuve_flush.

Original commit message from CVS:
* gst/tcp/gstmultifdsink.c: (gst_client_status_get_type),
(gst_multi_fd_sink_class_init), (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_remove_flush),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_handle_clients):
* gst/tcp/gstmultifdsink.h:
Add support for remuve_flush.
This commit is contained in:
Wim Taymans 2007-06-05 16:00:33 +00:00
parent 80c1e3d27c
commit 56e2a6b516
3 changed files with 108 additions and 16 deletions

View file

@ -1,3 +1,14 @@
2007-06-05 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstmultifdsink.c: (gst_client_status_get_type),
(gst_multi_fd_sink_class_init), (gst_multi_fd_sink_add_full),
(gst_multi_fd_sink_remove_flush),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_handle_clients):
* gst/tcp/gstmultifdsink.h:
Add support for remuve_flush.
2007-06-05 Wim Taymans <wim@fluendo.com> 2007-06-05 Wim Taymans <wim@fluendo.com>
* docs/design/draft-keyframe-force.txt: * docs/design/draft-keyframe-force.txt:

View file

@ -174,6 +174,7 @@ enum
SIGNAL_ADD, SIGNAL_ADD,
SIGNAL_ADD_BURST, SIGNAL_ADD_BURST,
SIGNAL_REMOVE, SIGNAL_REMOVE,
SIGNAL_REMOVE_FLUSH,
SIGNAL_CLEAR, SIGNAL_CLEAR,
SIGNAL_GET_STATS, SIGNAL_GET_STATS,
@ -317,6 +318,7 @@ gst_client_status_get_type (void)
{GST_CLIENT_STATUS_SLOW, "Too slow", "slow"}, {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"},
{GST_CLIENT_STATUS_ERROR, "Error", "error"}, {GST_CLIENT_STATUS_ERROR, "Error", "error"},
{GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"}, {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"},
{GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"},
{0, NULL, NULL}, {0, NULL, NULL},
}; };
@ -505,6 +507,18 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstMultiFdSinkClass, remove), G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
/**
* GstMultiFdSink::remove_flush:
* @gstmultifdsink: the multifdsink element to emit this signal on
* @fd: the file descriptor to remove from multifdsink
*
* Remove the given open file descriptor from multifdsink after flushing all
* the pending data to the fd.
*/
gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] =
g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, remove_flush),
NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
/** /**
* GstMultiFdSink::clear: * GstMultiFdSink::clear:
* @gstmultifdsink: the multifdsink element to emit this signal on * @gstmultifdsink: the multifdsink element to emit this signal on
@ -601,6 +615,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add); klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add);
klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full); klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full);
klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove); klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove);
klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush);
klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear);
klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats); klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats);
@ -678,6 +693,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
client->fd.fd = fd; client->fd.fd = fd;
client->status = GST_CLIENT_STATUS_OK; client->status = GST_CLIENT_STATUS_OK;
client->bufpos = -1; client->bufpos = -1;
client->flushcount = -1;
client->bufoffset = 0; client->bufoffset = 0;
client->sending = NULL; client->sending = NULL;
client->bytes_sent = 0; client->bytes_sent = 0;
@ -796,6 +812,40 @@ done:
CLIENTS_UNLOCK (sink); CLIENTS_UNLOCK (sink);
} }
/* "remove-flush" signal implementation */
void
gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
{
GList *clink;
GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);
CLIENTS_LOCK (sink);
clink = g_hash_table_lookup (sink->fd_hash, &fd);
if (clink != NULL) {
GstTCPClient *client = (GstTCPClient *) clink->data;
if (client->status != GST_CLIENT_STATUS_OK) {
GST_INFO_OBJECT (sink,
"[fd %5d] Client already disconnecting with status %d",
fd, client->status);
goto done;
}
/* take the position of the client as the number of buffers left to flush.
* If the client was at position -1, we flush 0 buffers, 0 == flush 1
* buffer, etc... */
client->flushcount = client->bufpos + 1;
/* mark client as flushing. We can not remove the client right away because
* it might have some buffers to flush in the ->sending queue. */
client->status = GST_CLIENT_STATUS_FLUSHING;
} else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
}
done:
CLIENTS_UNLOCK (sink);
}
/* can be called both through the signal (i.e. from any thread) or when /* can be called both through the signal (i.e. from any thread) or when
* stopping, after the writing thread has shut down */ * stopping, after the writing thread has shut down */
void void
@ -948,9 +998,11 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
GST_WARNING_OBJECT (sink, GST_WARNING_OBJECT (sink,
"[fd %5d] removing client %p because of error", fd, client); "[fd %5d] removing client %p because of error", fd, client);
break; break;
case GST_CLIENT_STATUS_FLUSHING:
default: default:
GST_WARNING_OBJECT (sink, GST_WARNING_OBJECT (sink,
"[fd %5d] removing client %p with invalid reason", fd, client); "[fd %5d] removing client %p with invalid reason %d", fd, client,
client->status);
break; break;
} }
@ -1770,15 +1822,19 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
int fd = client->fd.fd; int fd = client->fd.fd;
gboolean more; gboolean more;
gboolean res; gboolean res;
gboolean flushing;
GstClockTime now; GstClockTime now;
GTimeVal nowtv; GTimeVal nowtv;
g_get_current_time (&nowtv); g_get_current_time (&nowtv);
now = GST_TIMEVAL_TO_TIME (nowtv); now = GST_TIMEVAL_TO_TIME (nowtv);
flushing = client->status == GST_CLIENT_STATUS_FLUSHING;
/* when using GDP, first check if we have queued caps yet */ /* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_GDP) { if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
if (!client->caps_sent) { /* don't need to do anything when the client is flushing */
if (!client->caps_sent && !flushing) {
GstPad *peer; GstPad *peer;
GstCaps *caps; GstCaps *caps;
@ -1818,6 +1874,10 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* client is too fast, remove from write queue until new buffer is /* client is too fast, remove from write queue until new buffer is
* available */ * available */
gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE); gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
/* if we flushed out all of the client buffers, we can stop */
if (client->flushcount == 0)
goto flushed;
return TRUE; return TRUE;
} else { } else {
/* client can pick a buffer from the global queue */ /* client can pick a buffer from the global queue */
@ -1825,7 +1885,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
/* for new connections, we need to find a good spot in the /* for new connections, we need to find a good spot in the
* bufqueue to start streaming from */ * bufqueue to start streaming from */
if (client->new_connection) { if (client->new_connection && !flushing) {
gint position = gst_multi_fd_sink_new_client (sink, client); gint position = gst_multi_fd_sink_new_client (sink, client);
if (position >= 0) { if (position >= 0) {
@ -1839,9 +1899,18 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
} }
} }
/* we flushed all remaining buffers, no need to get a new one */
if (client->flushcount == 0)
goto flushed;
/* grab buffer */ /* grab buffer */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--; client->bufpos--;
/* decrease flushcount */
if (client->flushcount != -1)
client->flushcount--;
GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
fd, client, client->bufpos); fd, client, client->bufpos);
@ -1912,6 +1981,12 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
return TRUE; return TRUE;
/* ERRORS */ /* ERRORS */
flushed:
{
GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd);
client->status = GST_CLIENT_STATUS_REMOVED;
return FALSE;
}
connection_reset: connection_reset:
{ {
GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
@ -2311,7 +2386,8 @@ restart2:
client = (GstTCPClient *) clients->data; client = (GstTCPClient *) clients->data;
next = g_list_next (clients); next = g_list_next (clients);
if (client->status != GST_CLIENT_STATUS_OK) { if (client->status != GST_CLIENT_STATUS_FLUSHING
&& client->status != GST_CLIENT_STATUS_OK) {
gst_multi_fd_sink_remove_client_link (sink, clients); gst_multi_fd_sink_remove_client_link (sink, clients);
continue; continue;
} }

View file

@ -50,7 +50,7 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass;
typedef enum { typedef enum {
GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2), GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiFdSinkFlags; } GstMultiFdSinkFlags;
/** /**
@ -68,7 +68,7 @@ typedef enum
GST_RECOVER_POLICY_NONE, GST_RECOVER_POLICY_NONE,
GST_RECOVER_POLICY_RESYNC_LATEST, GST_RECOVER_POLICY_RESYNC_LATEST,
GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT, GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
GST_RECOVER_POLICY_RESYNC_KEYFRAME, GST_RECOVER_POLICY_RESYNC_KEYFRAME
} GstRecoverPolicy; } GstRecoverPolicy;
/** /**
@ -93,7 +93,7 @@ typedef enum
GST_SYNC_METHOD_LATEST_KEYFRAME, GST_SYNC_METHOD_LATEST_KEYFRAME,
GST_SYNC_METHOD_BURST, GST_SYNC_METHOD_BURST,
GST_SYNC_METHOD_BURST_KEYFRAME, GST_SYNC_METHOD_BURST_KEYFRAME,
GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_SYNC_METHOD_BURST_WITH_KEYFRAME
} GstSyncMethod; } GstSyncMethod;
/** /**
@ -110,7 +110,7 @@ typedef enum
GST_UNIT_TYPE_UNDEFINED, GST_UNIT_TYPE_UNDEFINED,
GST_UNIT_TYPE_BUFFERS, GST_UNIT_TYPE_BUFFERS,
GST_UNIT_TYPE_TIME, GST_UNIT_TYPE_TIME,
GST_UNIT_TYPE_BYTES, GST_UNIT_TYPE_BYTES
} GstUnitType; } GstUnitType;
/** /**
@ -121,6 +121,7 @@ typedef enum
* @GST_CLIENT_STATUS_SLOW : client is too slow * @GST_CLIENT_STATUS_SLOW : client is too slow
* @GST_CLIENT_STATUS_ERROR : client is in error * @GST_CLIENT_STATUS_ERROR : client is in error
* @GST_CLIENT_STATUS_DUPLICATE: same client added twice * @GST_CLIENT_STATUS_DUPLICATE: same client added twice
* @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers.
* *
* This specifies the reason why a client was removed from * This specifies the reason why a client was removed from
* multifdsink and is received in the "client-removed" signal. * multifdsink and is received in the "client-removed" signal.
@ -133,6 +134,7 @@ typedef enum
GST_CLIENT_STATUS_SLOW = 3, GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4, GST_CLIENT_STATUS_ERROR = 4,
GST_CLIENT_STATUS_DUPLICATE = 5, GST_CLIENT_STATUS_DUPLICATE = 5,
GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus; } GstClientStatus;
/* structure for a client /* structure for a client
@ -141,6 +143,8 @@ typedef struct {
GstFD fd; GstFD fd;
gint bufpos; /* position of this client in the global queue */ gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */
GstClientStatus status; GstClientStatus status;
gboolean is_socket; gboolean is_socket;
@ -249,6 +253,7 @@ struct _GstMultiFdSinkClass {
GstUnitType format, guint64 value, GstUnitType format, guint64 value,
GstUnitType max_unit, guint64 max_value); GstUnitType max_unit, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd); void (*remove) (GstMultiFdSink *sink, int fd);
void (*remove_flush) (GstMultiFdSink *sink, int fd);
void (*clear) (GstMultiFdSink *sink); void (*clear) (GstMultiFdSink *sink);
GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd);
@ -266,14 +271,14 @@ struct _GstMultiFdSinkClass {
GType gst_multi_fd_sink_get_type (void); GType gst_multi_fd_sink_get_type (void);
void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
GstUnitType min_unit, guint64 min_value, GstUnitType min_unit, guint64 min_value,
GstUnitType max_unit, guint64 max_value); GstUnitType max_unit, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd);
void gst_multi_fd_sink_clear (GstMultiFdSink *sink); void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); void gst_multi_fd_sink_clear (GstMultiFdSink *sink);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd);
G_END_DECLS G_END_DECLS