gst/tcp: Factor out common symbols; fix tests.

This commit is contained in:
Thomas Vander Stichele 2012-01-26 10:41:22 +01:00
parent 3b0fae73ae
commit 684aa4baaf
8 changed files with 3123 additions and 231 deletions

View file

@ -121,7 +121,7 @@
#endif
#include "gstmultifdsink.h"
#include "gstmultifdsink-marshal.h"
#include "gsttcp-marshal.h"
#define NOT_IMPLEMENTED 0
@ -533,7 +533,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
add_full), NULL, NULL,
gst_multi_fd_sink_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64,
G_TYPE_NONE, 6, G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE,
G_TYPE_UINT64, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
/**
@ -546,7 +546,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
gst_multi_fd_sink_signals[SIGNAL_REMOVE] =
g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
remove), NULL, NULL, gst_multi_fd_sink_marshal_VOID__INT, G_TYPE_NONE,
remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE,
1, G_TYPE_INT);
/**
* GstMultiFdSink::remove-flush:
@ -559,7 +559,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] =
g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
remove_flush), NULL, NULL, gst_multi_fd_sink_marshal_VOID__INT,
remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT,
G_TYPE_NONE, 1, G_TYPE_INT);
/**
* GstMultiFdSink::clear:
@ -593,7 +593,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
gst_multi_fd_sink_signals[SIGNAL_GET_STATS] =
g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
get_stats), NULL, NULL, gst_multi_fd_sink_marshal_BOXED__INT,
get_stats), NULL, NULL, gst_tcp_marshal_BOXED__INT,
G_TYPE_VALUE_ARRAY, 1, G_TYPE_INT);
/**
@ -608,8 +608,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added),
NULL, NULL, gst_multi_fd_sink_marshal_VOID__INT, G_TYPE_NONE, 1,
G_TYPE_INT);
NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
/**
* GstMultiFdSink::client-removed:
* @gstmultifdsink: the multifdsink element that emitted this signal
@ -627,7 +626,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] =
g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
client_removed), NULL, NULL, gst_multi_fd_sink_marshal_VOID__INT_ENUM,
client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_ENUM,
G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
/**
* GstMultiFdSink::client-fd-removed:
@ -647,7 +646,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED] =
g_signal_new ("client-fd-removed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass,
client_fd_removed), NULL, NULL, gst_multi_fd_sink_marshal_VOID__INT,
client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT,
G_TYPE_NONE, 1, G_TYPE_INT);
gst_element_class_add_pad_template (gstelement_class,
@ -1974,14 +1973,15 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
if (client->sending) {
ssize_t wrote;
GstBuffer *head;
GstMapInfo info;
guint8 *data;
gsize size;
/* pick first buffer from list */
head = GST_BUFFER (client->sending->data);
data = gst_buffer_map (head, &size, NULL, GST_MAP_READ);
maxsize = size - client->bufoffset;
g_assert (gst_buffer_map (head, &info, GST_MAP_READ));
data = info.data;
maxsize = info.size - client->bufoffset;
/* try to write the complete buffer */
#ifdef MSG_NOSIGNAL
@ -1994,7 +1994,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
} else {
wrote = write (fd, data + client->bufoffset, maxsize);
}
gst_buffer_unmap (head, data, size);
gst_buffer_unmap (head, &info);
if (wrote < 0) {
/* hmm error.. */
@ -2523,6 +2523,7 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
gst_buffer_ref (buf);
}
#endif
gst_buffer_ref (buf);
GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
@ -2893,19 +2894,3 @@ start_failed:
return GST_STATE_CHANGE_FAILURE;
}
}
static gboolean
plugin_init (GstPlugin * plugin)
{
if (!gst_element_register (plugin, "multifdsink", GST_RANK_NONE,
GST_TYPE_MULTI_FD_SINK))
return FALSE;
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"multifdsink",
"transfer data to multiple fds",
plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)

View file

@ -25,6 +25,8 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include "gstmultihandlesink.h"
G_BEGIN_DECLS
#define GST_TYPE_MULTI_FD_SINK \
@ -50,48 +52,6 @@ typedef enum {
GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiFdSinkFlags;
/**
* GstRecoverPolicy:
* @GST_RECOVER_POLICY_NONE : no recovering is done
* @GST_RECOVER_POLICY_RESYNC_LATEST : client is moved to last buffer
* @GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: client is moved to the soft limit
* @GST_RECOVER_POLICY_RESYNC_KEYFRAME : client is moved to latest keyframe
*
* Possible values for the recovery procedure to use when a client consumes
* data too slow and has a backlag of more that soft-limit buffers.
*/
typedef enum
{
GST_RECOVER_POLICY_NONE,
GST_RECOVER_POLICY_RESYNC_LATEST,
GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
GST_RECOVER_POLICY_RESYNC_KEYFRAME
} GstRecoverPolicy;
/**
* GstSyncMethod:
* @GST_SYNC_METHOD_LATEST : client receives most recent buffer
* @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe
* @GST_SYNC_METHOD_LATEST_KEYFRAME : client receives latest keyframe (burst)
* @GST_SYNC_METHOD_BURST : client receives specific amount of data
* @GST_SYNC_METHOD_BURST_KEYFRAME : client receives specific amount of data
* starting from latest keyframe
* @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from
* a keyframe, or if there is not enough data after
* the keyframe, starting before the keyframe
*
* This enum defines the selection of the first buffer that is sent
* to a new client.
*/
typedef enum
{
GST_SYNC_METHOD_LATEST,
GST_SYNC_METHOD_NEXT_KEYFRAME,
GST_SYNC_METHOD_LATEST_KEYFRAME,
GST_SYNC_METHOD_BURST,
GST_SYNC_METHOD_BURST_KEYFRAME,
GST_SYNC_METHOD_BURST_WITH_KEYFRAME
} GstSyncMethod;
/**
* GstTCPUnitType:
@ -110,30 +70,6 @@ typedef enum
GST_TCP_UNIT_TYPE_BYTES
} GstTCPUnitType;
/**
* GstClientStatus:
* @GST_CLIENT_STATUS_OK : client is ok
* @GST_CLIENT_STATUS_CLOSED : client closed the socket
* @GST_CLIENT_STATUS_REMOVED : client is removed
* @GST_CLIENT_STATUS_SLOW : client is too slow
* @GST_CLIENT_STATUS_ERROR : client is in error
* @GST_CLIENT_STATUS_DUPLICATE: same client added twice
* @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers.
*
* This specifies the reason why a client was removed from
* multifdsink and is received in the "client-removed" signal.
*/
typedef enum
{
GST_CLIENT_STATUS_OK = 0,
GST_CLIENT_STATUS_CLOSED = 1,
GST_CLIENT_STATUS_REMOVED = 2,
GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4,
GST_CLIENT_STATUS_DUPLICATE = 5,
GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus;
/* structure for a client
*/
typedef struct {
@ -176,11 +112,6 @@ typedef struct {
guint64 last_buffer_ts;
} GstTCPClient;
#define CLIENTS_LOCK_INIT(fdsink) (g_rec_mutex_init(&fdsink->clientslock))
#define CLIENTS_LOCK_CLEAR(fdsink) (g_rec_mutex_clear(&fdsink->clientslock))
#define CLIENTS_LOCK(fdsink) (g_rec_mutex_lock(&fdsink->clientslock))
#define CLIENTS_UNLOCK(fdsink) (g_rec_mutex_unlock(&fdsink->clientslock))
/**
* GstMultiFdSink:
*

2788
gst/tcp/gstmultihandlesink.c Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,273 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
* Copyright (C) <2011> Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_MULTI_HANDLE_SINK_H__
#define __GST_MULTI_HANDLE_SINK_H__
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gio/gio.h>
G_BEGIN_DECLS
#define GST_TYPE_MULTI_HANDLE_SINK \
(gst_multi_handle_sink_get_type())
#define GST_MULTI_HANDLE_SINK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_HANDLE_SINK,GstMultiHandleSink))
#define GST_MULTI_HANDLE_SINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_HANDLE_SINK,GstMultiHandleSinkClass))
#define GST_IS_MULTI_HANDLE_SINK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_HANDLE_SINK))
#define GST_IS_MULTI_HANDLE_SINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_HANDLE_SINK))
#define GST_MULTI_HANDLE_SINK_GET_CLASS(klass) \
(G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_MULTI_HANDLE_SINK, GstMultiHandleSinkClass))
typedef struct _GstMultiHandleSink GstMultiHandleSink;
typedef struct _GstMultiHandleSinkClass GstMultiHandleSinkClass;
#if 0
typedef enum {
GST_MULTI_HANDLE_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
GST_MULTI_HANDLE_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiHandleSinkFlags;
#endif
/**
* GstRecoverPolicy:
* @GST_RECOVER_POLICY_NONE : no recovering is done
* @GST_RECOVER_POLICY_RESYNC_LATEST : client is moved to last buffer
* @GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: client is moved to the soft limit
* @GST_RECOVER_POLICY_RESYNC_KEYFRAME : client is moved to latest keyframe
*
* Possible values for the recovery procedure to use when a client consumes
* data too slow and has a backlag of more that soft-limit buffers.
*/
typedef enum
{
GST_RECOVER_POLICY_NONE,
GST_RECOVER_POLICY_RESYNC_LATEST,
GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
GST_RECOVER_POLICY_RESYNC_KEYFRAME
} GstRecoverPolicy;
/**
* GstSyncMethod:
* @GST_SYNC_METHOD_LATEST : client receives most recent buffer
* @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe
* @GST_SYNC_METHOD_LATEST_KEYFRAME : client receives latest keyframe (burst)
* @GST_SYNC_METHOD_BURST : client receives specific amount of data
* @GST_SYNC_METHOD_BURST_KEYFRAME : client receives specific amount of data
* starting from latest keyframe
* @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from
* a keyframe, or if there is not enough data after
* the keyframe, starting before the keyframe
*
* This enum defines the selection of the first buffer that is sent
* to a new client.
*/
typedef enum
{
GST_SYNC_METHOD_LATEST,
GST_SYNC_METHOD_NEXT_KEYFRAME,
GST_SYNC_METHOD_LATEST_KEYFRAME,
GST_SYNC_METHOD_BURST,
GST_SYNC_METHOD_BURST_KEYFRAME,
GST_SYNC_METHOD_BURST_WITH_KEYFRAME
} GstSyncMethod;
/**
* GstClientStatus:
* @GST_CLIENT_STATUS_OK : client is ok
* @GST_CLIENT_STATUS_CLOSED : client closed the socket
* @GST_CLIENT_STATUS_REMOVED : client is removed
* @GST_CLIENT_STATUS_SLOW : client is too slow
* @GST_CLIENT_STATUS_ERROR : client is in error
* @GST_CLIENT_STATUS_DUPLICATE: same client added twice
* @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers.
*
* This specifies the reason why a client was removed from
* multisocketsink and is received in the "client-removed" signal.
*/
typedef enum
{
GST_CLIENT_STATUS_OK = 0,
GST_CLIENT_STATUS_CLOSED = 1,
GST_CLIENT_STATUS_REMOVED = 2,
GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4,
GST_CLIENT_STATUS_DUPLICATE = 5,
GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus;
#if 0
/* structure for a client
*/
typedef struct {
GSocket *socket;
GSource *source;
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */
GstClientStatus status;
GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */
gboolean discont;
gboolean new_connection;
gboolean currently_removing;
/* method to sync client when connecting */
GstSyncMethod sync_method;
GstFormat burst_min_format;
guint64 burst_min_value;
GstFormat burst_max_format;
guint64 burst_max_value;
GstCaps *caps; /* caps of last queued buffer */
/* stats */
guint64 bytes_sent;
guint64 connect_time;
guint64 disconnect_time;
guint64 last_activity_time;
guint64 dropped_buffers;
guint64 avg_queue_size;
guint64 first_buffer_ts;
guint64 last_buffer_ts;
} GstSocketClient;
#endif
#define CLIENTS_LOCK_INIT(socketsink) (g_rec_mutex_init(&socketsink->clientslock))
#define CLIENTS_LOCK_CLEAR(socketsink) (g_rec_mutex_clear(&socketsink->clientslock))
#define CLIENTS_LOCK(socketsink) (g_rec_mutex_lock(&socketsink->clientslock))
#define CLIENTS_UNLOCK(socketsink) (g_rec_mutex_unlock(&socketsink->clientslock))
/**
* GstMultiHandleSink:
*
* The multisocketsink object structure.
*/
struct _GstMultiHandleSink {
GstBaseSink element;
/*< private >*/
guint64 bytes_to_serve; /* how much bytes we must serve */
guint64 bytes_served; /* how much bytes have we served */
GRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */
GHashTable *socket_hash; /* index on socket to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
GMainContext *main_context;
GCancellable *cancellable;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
guint mtu;
gint qos_dscp;
gboolean handle_read;
GArray *bufqueue; /* global queue of buffers */
gboolean running; /* the thread state */
GThread *thread; /* the sender thread */
/* these values are used to check if a client is reading fast
* enough and to control receovery */
GstFormat unit_type;/* the format of the units */
gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */
GstSyncMethod def_sync_method; /* what method to use for connecting clients */
GstFormat def_burst_format;
guint64 def_burst_value;
/* these values are used to control the amount of data
* kept in the queues. It allows clients to perform a burst
* on connect. */
gint bytes_min; /* min number of bytes to queue */
gint64 time_min; /* min time to queue */
gint buffers_min; /* min number of buffers to queue */
gboolean resend_streamheader; /* resend streamheader if it changes */
/* stats */
gint buffers_queued; /* number of queued buffers */
gint bytes_queued; /* number of queued bytes */
gint time_queued; /* number of queued time */
guint8 header_flags;
};
struct _GstMultiHandleSinkClass {
GstBaseSinkClass parent_class;
/* element methods */
void (*add) (GstMultiHandleSink *sink, GSocket *socket);
void (*add_full) (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync,
GstFormat format, guint64 value,
GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiHandleSink *sink, GSocket *socket);
void (*remove_flush) (GstMultiHandleSink *sink, GSocket *socket);
void (*clear) (GstMultiHandleSink *sink);
GstStructure* (*get_stats) (GstMultiHandleSink *sink, GSocket *socket);
/* vtable */
gboolean (*init) (GstMultiHandleSink *sink);
gboolean (*close) (GstMultiHandleSink *sink);
void (*removed) (GstMultiHandleSink *sink, GSocket *socket);
/* signals */
void (*client_added) (GstElement *element, GSocket *socket);
void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status);
void (*client_socket_removed) (GstElement *element, GSocket *socket);
};
GType gst_multi_handle_sink_get_type (void);
#if 0
void gst_multi_handle_sink_add (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_add_full (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync,
GstFormat min_format, guint64 min_value,
GstFormat max_format, guint64 max_value);
void gst_multi_handle_sink_remove (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_remove_flush (GstMultiHandleSink *sink, GSocket *socket);
void gst_multi_handle_sink_clear (GstMultiHandleSink *sink);
GstStructure* gst_multi_handle_sink_get_stats (GstMultiHandleSink *sink, GSocket *socket);
G_END_DECLS
#endif
#endif /* __GST_MULTI_HANDLE_SINK_H__ */

View file

@ -24,9 +24,12 @@
#ifndef __GST_MULTI_SOCKET_SINK_H__
#define __GST_MULTI_SOCKET_SINK_H__
#include <gio/gio.h>
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gio/gio.h>
#include "gstmultihandlesink.h"
G_BEGIN_DECLS
@ -53,73 +56,6 @@ typedef enum {
GST_MULTI_SOCKET_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiSocketSinkFlags;
/**
* GstRecoverPolicy:
* @GST_RECOVER_POLICY_NONE : no recovering is done
* @GST_RECOVER_POLICY_RESYNC_LATEST : client is moved to last buffer
* @GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: client is moved to the soft limit
* @GST_RECOVER_POLICY_RESYNC_KEYFRAME : client is moved to latest keyframe
*
* Possible values for the recovery procedure to use when a client consumes
* data too slow and has a backlag of more that soft-limit buffers.
*/
typedef enum
{
GST_RECOVER_POLICY_NONE,
GST_RECOVER_POLICY_RESYNC_LATEST,
GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT,
GST_RECOVER_POLICY_RESYNC_KEYFRAME
} GstRecoverPolicy;
/**
* GstSyncMethod:
* @GST_SYNC_METHOD_LATEST : client receives most recent buffer
* @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe
* @GST_SYNC_METHOD_LATEST_KEYFRAME : client receives latest keyframe (burst)
* @GST_SYNC_METHOD_BURST : client receives specific amount of data
* @GST_SYNC_METHOD_BURST_KEYFRAME : client receives specific amount of data
* starting from latest keyframe
* @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from
* a keyframe, or if there is not enough data after
* the keyframe, starting before the keyframe
*
* This enum defines the selection of the first buffer that is sent
* to a new client.
*/
typedef enum
{
GST_SYNC_METHOD_LATEST,
GST_SYNC_METHOD_NEXT_KEYFRAME,
GST_SYNC_METHOD_LATEST_KEYFRAME,
GST_SYNC_METHOD_BURST,
GST_SYNC_METHOD_BURST_KEYFRAME,
GST_SYNC_METHOD_BURST_WITH_KEYFRAME
} GstSyncMethod;
/**
* GstClientStatus:
* @GST_CLIENT_STATUS_OK : client is ok
* @GST_CLIENT_STATUS_CLOSED : client closed the socket
* @GST_CLIENT_STATUS_REMOVED : client is removed
* @GST_CLIENT_STATUS_SLOW : client is too slow
* @GST_CLIENT_STATUS_ERROR : client is in error
* @GST_CLIENT_STATUS_DUPLICATE: same client added twice
* @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers.
*
* This specifies the reason why a client was removed from
* multisocketsink and is received in the "client-removed" signal.
*/
typedef enum
{
GST_CLIENT_STATUS_OK = 0,
GST_CLIENT_STATUS_CLOSED = 1,
GST_CLIENT_STATUS_REMOVED = 2,
GST_CLIENT_STATUS_SLOW = 3,
GST_CLIENT_STATUS_ERROR = 4,
GST_CLIENT_STATUS_DUPLICATE = 5,
GST_CLIENT_STATUS_FLUSHING = 6
} GstClientStatus;
/* structure for a client
*/
typedef struct {

View file

@ -2,3 +2,7 @@ VOID:STRING,UINT
VOID:OBJECT,ENUM
VOID:OBJECT,ENUM,ENUM,UINT64,ENUM,UINT64
BOXED:OBJECT
VOID:INT,ENUM,INT,UINT64,INT,UINT64
VOID:INT
VOID:INT,ENUM
BOXED:INT

View file

@ -25,6 +25,7 @@
#include "gsttcpclientsink.h"
#include "gsttcpserversrc.h"
#include "gsttcpserversink.h"
#include "gstmultifdsink.h"
#include "gstmultisocketsink.h"
GST_DEBUG_CATEGORY (tcp_debug);
@ -44,6 +45,9 @@ plugin_init (GstPlugin * plugin)
if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE,
GST_TYPE_TCP_SERVER_SRC))
return FALSE;
if (!gst_element_register (plugin, "multifdsink", GST_RANK_NONE,
GST_TYPE_MULTI_FD_SINK))
return FALSE;
if (!gst_element_register (plugin, "multisocketsink", GST_RANK_NONE,
GST_TYPE_MULTI_SOCKET_SINK))
return FALSE;

View file

@ -42,6 +42,7 @@ setup_multifdsink (void)
GST_DEBUG ("setup_multifdsink");
multifdsink = gst_check_setup_element ("multifdsink");
mysrcpad = gst_check_setup_src_pad (multifdsink, &srctemplate);
GST_PAD_UNSET_FLUSHING (mysrcpad);
return multifdsink;
}
@ -117,10 +118,11 @@ GST_START_TEST (test_add_client)
g_signal_emit_by_name (sink, "add", pfd[1]);
caps = gst_caps_from_string ("application/x-gst-check");
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
buffer = gst_buffer_new_and_alloc (4);
gst_pad_set_caps (mysrcpad, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_buffer_fill (buffer, 0, "dead", 4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
@ -247,9 +249,10 @@ GST_START_TEST (test_streamheader)
* buffers */
gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
&caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
/* one is ours, two on the buffers, and one now on the pad */
ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
/* one is ours, two from set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
@ -334,9 +337,10 @@ GST_START_TEST (test_change_streamheader)
* buffers */
gst_multifdsink_create_streamheader ("first", "header", &hbuf1, &hbuf2,
&caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
/* one is ours, two on the buffers, and one now on the pad */
ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
/* one is ours, two from set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
/* one to hold for the test and one to give away */
ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
@ -427,11 +431,28 @@ GST_START_TEST (test_change_streamheader)
GST_END_TEST;
static GstBuffer *
gst_new_buffer (int i)
{
GstMapInfo info;
gchar *data;
GstBuffer *buffer = gst_buffer_new_and_alloc (16);
/* copy some id */
g_assert (gst_buffer_map (buffer, &info, GST_MAP_WRITE));
data = (gchar *) info.data;
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, &info);
return buffer;
}
/* keep 100 bytes and burst 80 bytes to clients */
GST_START_TEST (test_burst_client_bytes)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
@ -459,14 +480,7 @@ GST_START_TEST (test_burst_client_bytes)
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
GstBuffer *buffer = gst_new_buffer (i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
@ -484,14 +498,7 @@ GST_START_TEST (test_burst_client_bytes)
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
GstBuffer *buffer = gst_new_buffer (i);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
@ -545,7 +552,6 @@ GST_END_TEST;
GST_START_TEST (test_burst_client_bytes_keyframe)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
@ -573,19 +579,12 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
GstBuffer *buffer = gst_new_buffer (i);
/* mark most buffers as delta */
if (i != 0 && i != 4 && i != 8)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
@ -602,16 +601,10 @@ GST_START_TEST (test_burst_client_bytes_keyframe)
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
gchar *data;
GstBuffer *buffer = gst_new_buffer (i);
buffer = gst_buffer_new_and_alloc (16);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
@ -661,7 +654,6 @@ GST_END_TEST;
GST_START_TEST (test_burst_client_bytes_with_keyframe)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
int pfd2[2];
@ -689,19 +681,12 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
/* push buffers in, 9 * 16 bytes = 144 bytes */
for (i = 0; i < 9; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
GstBuffer *buffer = gst_new_buffer (i);
/* mark most buffers as delta */
if (i != 0 && i != 4 && i != 8)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
@ -718,16 +703,10 @@ GST_START_TEST (test_burst_client_bytes_with_keyframe)
/* push last buffer to make client fds ready for reading */
for (i = 9; i < 10; i++) {
gchar *data;
GstBuffer *buffer = gst_new_buffer (i);
buffer = gst_buffer_new_and_alloc (16);
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
}
@ -784,7 +763,6 @@ GST_END_TEST;
GST_START_TEST (test_client_next_keyframe)
{
GstElement *sink;
GstBuffer *buffer;
GstCaps *caps;
int pfd1[2];
gchar data[16];
@ -806,14 +784,7 @@ GST_START_TEST (test_client_next_keyframe)
/* push buffers in: keyframe, then non-keyframe */
for (i = 0; i < 2; i++) {
gchar *data;
buffer = gst_buffer_new_and_alloc (16);
/* copy some id */
data = gst_buffer_map (buffer, NULL, NULL, GST_MAP_WRITE);
g_snprintf (data, 16, "deadbee%08x", i);
gst_buffer_unmap (buffer, data, 16);
GstBuffer *buffer = gst_new_buffer (i);
if (i > 0)
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);