srt: Introduce SRT source and sink

SRT[0] is an open source transport technology[1] that optimizes
streaming performance across unpredictable networks.

Although SRT is based on UDP, it works like connection-oriented
protocol. However, it doesn't mean that the SRT server or client
is necessarily to link to a receiver or a sender so, here, the
pairs of source and sink elements are introduced.

 - srtserversink: SRT server to feed SRT stream
 - srtclientsrc:  SRT client to get SRT stream from srtserversink

 - srtclientsink: SRT client to send SRT stream
 - srtserversrc:  SRT server to listen from srtclientsink

[0] https://github.com/Haivision/srt
[1] http://www.srtalliance.org/

https://bugzilla.gnome.org/show_bug.cgi?id=785730
This commit is contained in:
Justin Kim 2017-07-31 14:38:34 +09:00 committed by Olivier Crête
parent 07819afda4
commit f78be9d698
19 changed files with 2926 additions and 0 deletions

View file

@ -2533,6 +2533,15 @@ AG_GST_CHECK_FEATURE(LIBMMS, [mms protocol library], libmms, [
])
AC_SUBST(LIBMMS_LIBS)
dnl *** libsrt ***
translit(dnm, m, l) AM_CONDITIONAL(USE_SRT, true)
AG_GST_CHECK_FEATURE(SRT, [srt library], srt, [
PKG_CHECK_MODULES(SRT, libsrt, HAVE_SRT="yes",
AG_GST_CHECK_LIBHEADER(SRT, srt, srt_startup, , srt/srt.h, SRT_LIBS="-lsrt")
)
AC_SUBST(SRT_LIBS)
AC_SUBST(SRT_CFLAGS)
])
dnl *** libsrtp ***
translit(dnm, m, l) AM_CONDITIONAL(USE_SRTP, true)
@ -3793,6 +3802,7 @@ ext/smoothstreaming/Makefile
ext/sndfile/Makefile
ext/soundtouch/Makefile
ext/spandsp/Makefile
ext/srt/Makefile
ext/srtp/Makefile
ext/teletextdec/Makefile
ext/gme/Makefile

View file

@ -328,6 +328,12 @@ else
SPC_DIR=
endif
if USE_SRT
SRT_DIR=srt
else
SRT_DIR=
endif
if USE_SRTP
SRTP_DIR=srtp
else
@ -457,6 +463,7 @@ SUBDIRS=\
$(SPANDSP_DIR) \
$(GME_DIR) \
$(SPC_DIR) \
$(SRT_DIR) \
$(SRTP_DIR) \
$(TELETEXTDEC_DIR) \
$(WILDMIDI_DIR) \
@ -522,6 +529,7 @@ DIST_SUBDIRS = \
soundtouch \
spandsp \
spc \
srt \
srtp \
gme \
teletextdec \

View file

@ -54,6 +54,7 @@ if cc.get_id() != 'msvc'
subdir('spandsp')
endif
#subdir('spc')
subdir('srt')
subdir('srtp')
#subdir('teletextdec')
#subdir('wildmidi')

40
ext/srt/Makefile.am Normal file
View file

@ -0,0 +1,40 @@
plugin_LTLIBRARIES = libgstsrt.la
libgstsrt_la_SOURCES = \
gstsrt.c \
gstsrtbasesrc.c \
gstsrtclientsrc.c \
gstsrtserversrc.c \
gstsrtbasesink.c \
gstsrtclientsink.c \
gstsrtserversink.c \
$(NULL)
libgstsrt_la_CFLAGS = \
$(GST_PLUGINS_BASE_CFLAGS) \
$(GST_CFLAGS) \
$(GIO_CFLAGS) \
$(SRT_CFLAGS) \
$(NULL)
libgstsrt_la_LIBADD = \
$(GST_PLUGINS_BASE_LIBS) \
$(GST_LIBS) \
$(GIO_LIBS) \
-lgstbase-1.0 \
$(SRT_LIBS) \
$(NULL)
libgstsrt_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
CLEANFILES = $(BUILT_SOURCES)
noinst_HEADERS = \
gstsrtbasesink.h \
gstsrtclientsink.h \
gstsrtserversrc.h \
gstsrtbasesrc.h \
gstsrtclientsrc.h \
gstsrtserversink.h
include $(top_srcdir)/common/gst-glib-gen.mak

184
ext/srt/gstsrt.c Normal file
View file

@ -0,0 +1,184 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrt.h"
#include "gstsrtclientsrc.h"
#include "gstsrtserversrc.h"
#include "gstsrtclientsink.h"
#include "gstsrtserversink.h"
#define GST_CAT_DEFAULT gst_debug_srt
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
SRTSOCKET
gst_srt_client_connect (GstElement * elem, int sender,
const gchar * host, guint16 port, int rendez_vous,
const gchar * bind_address, guint16 bind_port, int latency,
GSocketAddress ** socket_address, gint * poll_id)
{
SRTSOCKET sock;
GError *error = NULL;
gpointer sa;
size_t sa_len;
*socket_address = g_inet_socket_address_new_from_string (host, port);
if (*socket_address == NULL) {
GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid host"),
("Failed to parse host"));
goto failed;
}
sa_len = g_socket_address_get_native_size (*socket_address);
sa = g_alloca (sa_len);
if (!g_socket_address_to_native (*socket_address, sa, sa_len, &error)) {
GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid address"),
("cannot resolve address (reason: %s)", error->message));
goto failed;
}
sock = srt_socket (g_socket_address_get_family (*socket_address), SOCK_DGRAM,
0);
if (sock == SRT_ERROR) {
GST_ELEMENT_ERROR (elem, LIBRARY, INIT, (NULL),
("failed to create SRT socket (reason: %s)", srt_getlasterror_str ()));
goto failed;
}
/* Make sure TSBPD mode is enable (SRT mode) */
srt_setsockopt (sock, 0, SRTO_TSBPDMODE, &(int) {
1}, sizeof (int));
/* This is a sink, we're always a receiver */
srt_setsockopt (sock, 0, SRTO_SENDER, &sender, sizeof (int));
srt_setsockopt (sock, 0, SRTO_TSBPDDELAY, &latency, sizeof (int));
srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &rendez_vous, sizeof (int));
if (bind_address || bind_port || rendez_vous) {
gpointer bsa;
size_t bsa_len;
GSocketAddress *b_socket_address = NULL;
if (bind_address == NULL)
bind_address = "0.0.0.0";
if (rendez_vous)
bind_port = port;
b_socket_address = g_inet_socket_address_new_from_string (bind_address,
bind_port);
if (b_socket_address == NULL) {
GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid bind address"),
("Failed to parse bind address: %s:%d", bind_address, bind_port));
goto failed;
}
bsa_len = g_socket_address_get_native_size (b_socket_address);
bsa = g_alloca (bsa_len);
if (!g_socket_address_to_native (b_socket_address, bsa, bsa_len, &error)) {
GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid bind address"),
("Can't parse bind address to sockaddr: %s", error->message));
g_clear_object (&b_socket_address);
goto failed;
}
g_clear_object (&b_socket_address);
if (srt_bind (sock, bsa, bsa_len) == SRT_ERROR) {
GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ,
("Can't bind to address"),
("Can't bind to %s:%d (reason: %s)", bind_address, bind_port,
srt_getlasterror_str ()));
goto failed;
}
}
*poll_id = srt_epoll_create ();
if (*poll_id == -1) {
GST_ELEMENT_ERROR (elem, LIBRARY, INIT, (NULL),
("failed to create poll id for SRT socket (reason: %s)",
srt_getlasterror_str ()));
goto failed;
}
srt_epoll_add_usock (*poll_id, sock, &(int) {
SRT_EPOLL_OUT});
if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Connection error"),
("failed to connect to host (reason: %s)", srt_getlasterror_str ()));
goto failed;
}
return sock;
failed:
if (*poll_id != SRT_ERROR) {
srt_epoll_release (*poll_id);
*poll_id = SRT_ERROR;
}
if (sock != SRT_INVALID_SOCK) {
srt_close (sock);
sock = SRT_INVALID_SOCK;
}
g_clear_error (&error);
g_clear_object (socket_address);
return SRT_INVALID_SOCK;
}
static gboolean
plugin_init (GstPlugin * plugin)
{
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srt", 0, "SRT Common code");
if (!gst_element_register (plugin, "srtclientsrc", GST_RANK_PRIMARY,
GST_TYPE_SRT_CLIENT_SRC))
return FALSE;
if (!gst_element_register (plugin, "srtserversrc", GST_RANK_PRIMARY,
GST_TYPE_SRT_SERVER_SRC))
return FALSE;
if (!gst_element_register (plugin, "srtclientsink", GST_RANK_PRIMARY,
GST_TYPE_SRT_CLIENT_SINK))
return FALSE;
if (!gst_element_register (plugin, "srtserversink", GST_RANK_PRIMARY,
GST_TYPE_SRT_SERVER_SINK))
return FALSE;
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
srt,
"transfer data via SRT",
plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);

46
ext/srt/gstsrt.h Normal file
View file

@ -0,0 +1,46 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author: Olivier Crete <olivier.crete@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_H__
#define __GST_SRT_H__
#include <gst/gst.h>
#include <gio/gio.h>
#include <srt/srt.h>
#define SRT_URI_SCHEME "srt"
#define SRT_DEFAULT_PORT 7001
#define SRT_DEFAULT_HOST "127.0.0.1"
#define SRT_DEFAULT_URI SRT_URI_SCHEME"://"SRT_DEFAULT_HOST":"G_STRINGIFY(SRT_DEFAULT_PORT)
#define SRT_DEFAULT_LATENCY 125
G_BEGIN_DECLS
SRTSOCKET
gst_srt_client_connect (GstElement * elem, int sender,
const gchar * host, guint16 port, int rendez_vous,
const gchar * bind_address, guint16 bind_port, int latency,
GSocketAddress ** socket_address, gint * poll_id);
G_END_DECLS
#endif /* __GST_SRT_H__ */

302
ext/srt/gstsrtbasesink.c Normal file
View file

@ -0,0 +1,302 @@
/* GStreamer SRT plugin based on libsrt
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrtserversink.h"
#include "gstsrt.h"
#include <srt/srt.h>
#include <netinet/in.h>
#define SRT_DEFAULT_POLL_TIMEOUT -1
#define GST_CAT_DEFAULT gst_debug_srt_base_sink
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
enum
{
PROP_URI = 1,
PROP_LATENCY,
/*< private > */
PROP_LAST
};
static GParamSpec *properties[PROP_LAST];
static void gst_srt_base_sink_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static gchar *gst_srt_base_sink_uri_get_uri (GstURIHandler * handler);
static gboolean gst_srt_base_sink_uri_set_uri (GstURIHandler * handler,
const gchar * uri, GError ** error);
#define gst_srt_base_sink_parent_class parent_class
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstSRTBaseSink, gst_srt_base_sink,
GST_TYPE_BASE_SINK,
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
gst_srt_base_sink_uri_handler_init)
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtbasesink", 0,
"SRT Base Sink"));
static void
gst_srt_base_sink_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
switch (prop_id) {
case PROP_URI:
if (self->uri != NULL) {
gchar *uri_str = gst_srt_base_sink_uri_get_uri (GST_URI_HANDLER (self));
g_value_take_string (value, uri_str);
}
break;
case PROP_LATENCY:
g_value_set_int (value, self->latency);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_base_sink_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
switch (prop_id) {
case PROP_URI:
gst_srt_base_sink_uri_set_uri (GST_URI_HANDLER (self),
g_value_get_string (value), NULL);
break;
case PROP_LATENCY:
self->latency = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_base_sink_finalize (GObject * object)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
g_clear_pointer (&self->uri, gst_uri_unref);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static GstFlowReturn
gst_srt_base_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink);
GstMapInfo info;
GstSRTBaseSinkClass *bclass = GST_SRT_BASE_SINK_GET_CLASS (sink);
GstFlowReturn ret = GST_FLOW_OK;
GST_TRACE_OBJECT (self, "sending buffer %p, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
", size %" G_GSIZE_FORMAT,
buffer, GST_BUFFER_OFFSET (buffer),
GST_BUFFER_OFFSET_END (buffer),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
gst_buffer_get_size (buffer));
if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
GST_ELEMENT_ERROR (self, RESOURCE, READ,
("Could not map the input stream"), (NULL));
return GST_FLOW_ERROR;
}
if (!bclass->send_buffer (self, &info))
ret = GST_FLOW_ERROR;
gst_buffer_unmap (buffer, &info);
return ret;
}
static void
gst_srt_base_sink_class_init (GstSRTBaseSinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
gobject_class->set_property = gst_srt_base_sink_set_property;
gobject_class->get_property = gst_srt_base_sink_get_property;
gobject_class->finalize = gst_srt_base_sink_finalize;
/**
* GstSRTBaseSink:uri:
*
* The URI used by SRT Connection.
*/
properties[PROP_URI] = g_param_spec_string ("uri", "URI",
"URI in the form of srt://address:port", SRT_DEFAULT_URI,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_LATENCY] =
g_param_spec_int ("latency", "latency",
"Minimum latency (milliseconds)", 0,
G_MAXINT32, SRT_DEFAULT_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_srt_base_sink_render);
}
static void
gst_srt_base_sink_init (GstSRTBaseSink * self)
{
self->uri = gst_uri_from_string (SRT_DEFAULT_URI);
self->queued_buffers = NULL;
self->latency = SRT_DEFAULT_LATENCY;
}
static GstURIType
gst_srt_base_sink_uri_get_type (GType type)
{
return GST_URI_SINK;
}
static const gchar *const *
gst_srt_base_sink_uri_get_protocols (GType type)
{
static const gchar *protocols[] = { SRT_URI_SCHEME, NULL };
return protocols;
}
static gchar *
gst_srt_base_sink_uri_get_uri (GstURIHandler * handler)
{
gchar *uri_str;
GstSRTBaseSink *self = GST_SRT_BASE_SINK (handler);
GST_OBJECT_LOCK (self);
uri_str = gst_uri_to_string (self->uri);
GST_OBJECT_UNLOCK (self);
return uri_str;
}
static gboolean
gst_srt_base_sink_uri_set_uri (GstURIHandler * handler,
const gchar * uri, GError ** error)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (handler);
gboolean ret = TRUE;
GstUri *parsed_uri = gst_uri_from_string (uri);
GST_TRACE_OBJECT (self, "Requested URI=%s", uri);
if (g_strcmp0 (gst_uri_get_scheme (parsed_uri), SRT_URI_SCHEME) != 0) {
g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Invalid SRT URI scheme");
ret = FALSE;
goto out;
}
GST_OBJECT_LOCK (self);
g_clear_pointer (&self->uri, gst_uri_unref);
self->uri = gst_uri_ref (parsed_uri);
GST_OBJECT_UNLOCK (self);
out:
g_clear_pointer (&parsed_uri, gst_uri_unref);
return ret;
}
static void
gst_srt_base_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_srt_base_sink_uri_get_type;
iface->get_protocols = gst_srt_base_sink_uri_get_protocols;
iface->get_uri = gst_srt_base_sink_uri_get_uri;
iface->set_uri = gst_srt_base_sink_uri_set_uri;
}
GstStructure *
gst_srt_base_sink_get_stats (GSocketAddress * sockaddr, SRTSOCKET sock)
{
SRT_TRACEBSTATS stats;
int ret;
GValue v = G_VALUE_INIT;
GstStructure *s;
if (sock == SRT_INVALID_SOCK || sockaddr == NULL)
return gst_structure_new_empty ("application/x-srt-statistics");
s = gst_structure_new ("application/x-srt-statistics",
"sockaddr", G_TYPE_SOCKET_ADDRESS, sockaddr, NULL);
ret = srt_bstats (sock, &stats, 0);
if (ret >= 0) {
gst_structure_set (s,
/* number of sent data packets, including retransmissions */
"packets-sent", G_TYPE_INT64, stats.pktSent,
/* number of lost packets (sender side) */
"packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
/* number of retransmitted packets */
"packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
/* number of received ACK packets */
"packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
/* number of received NAK packets */
"packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
/* time duration when UDT is sending data (idle time exclusive) */
"send-duration-us", G_TYPE_INT64, stats.usSndDuration,
/* number of sent data bytes, including retransmissions */
"bytes-sent", G_TYPE_UINT64, stats.byteSent,
/* number of retransmitted bytes */
"bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
/* number of too-late-to-send dropped bytes */
"bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
/* number of too-late-to-send dropped packets */
"packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
/* sending rate in Mb/s */
"send-rate-mbps", G_TYPE_DOUBLE, stats.msRTT,
/* estimated bandwidth, in Mb/s */
"bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
/* busy sending time (i.e., idle time exclusive) */
"send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
"rtt-ms", G_TYPE_DOUBLE, stats.msRTT,
"negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
}
g_value_init (&v, G_TYPE_STRING);
g_value_take_string (&v,
g_socket_connectable_to_string (G_SOCKET_CONNECTABLE (sockaddr)));
gst_structure_take_value (s, "sockaddr-str", &v);
return s;
}

73
ext/srt/gstsrtbasesink.h Normal file
View file

@ -0,0 +1,73 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_BASE_SINK_H__
#define __GST_SRT_BASE_SINK_H__
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gio/gio.h>
#include <srt/srt.h>
G_BEGIN_DECLS
#define GST_TYPE_SRT_BASE_SINK (gst_srt_base_sink_get_type ())
#define GST_IS_SRT_BASE_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_BASE_SINK))
#define GST_IS_SRT_BASE_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_BASE_SINK))
#define GST_SRT_BASE_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSinkClass))
#define GST_SRT_BASE_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSink))
#define GST_SRT_BASE_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSinkClass))
#define GST_SRT_BASE_SINK_CAST(obj) ((GstSRTBaseSink*)(obj))
#define GST_SRT_BASE_SINK_CLASS_CAST(klass) ((GstSRTBaseSinkClass*)(klass))
typedef struct _GstSRTBaseSink GstSRTBaseSink;
typedef struct _GstSRTBaseSinkClass GstSRTBaseSinkClass;
struct _GstSRTBaseSink {
GstBaseSink parent;
GstUri *uri;
GList *queued_buffers;
gint latency;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
struct _GstSRTBaseSinkClass {
GstBaseSinkClass parent_class;
/* ask the subclass to send a buffer */
gboolean (*send_buffer) (GstSRTBaseSink *self, const GstMapInfo *mapinfo);
gpointer _gst_reserved[GST_PADDING_LARGE];
};
GST_EXPORT
GType gst_srt_base_sink_get_type (void);
GstStructure * gst_srt_base_sink_get_stats (GSocketAddress *sockaddr,
SRTSOCKET sock);
G_END_DECLS
#endif /* __GST_SRT_BASE_SINK_H__ */

262
ext/srt/gstsrtbasesrc.c Normal file
View file

@ -0,0 +1,262 @@
/* GStreamer SRT plugin based on libsrt
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrtbasesrc.h"
#include "gstsrt.h"
#include <srt/srt.h>
#include <gio/gio.h>
#include <netinet/in.h>
#define GST_CAT_DEFAULT gst_debug_srt_base_src
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
enum
{
PROP_URI = 1,
PROP_CAPS,
PROP_LATENCY,
/*< private > */
PROP_LAST
};
static GParamSpec *properties[PROP_LAST];
static void gst_srt_base_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static gchar *gst_srt_base_src_uri_get_uri (GstURIHandler * handler);
static gboolean gst_srt_base_src_uri_set_uri (GstURIHandler * handler,
const gchar * uri, GError ** error);
#define gst_srt_base_src_parent_class parent_class
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstSRTBaseSrc, gst_srt_base_src,
GST_TYPE_PUSH_SRC, G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
gst_srt_base_src_uri_handler_init)
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtbasesrc", 0,
"SRT Base Source"));
static void
gst_srt_base_src_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
switch (prop_id) {
case PROP_URI:
if (self->uri != NULL) {
gchar *uri_str = gst_srt_base_src_uri_get_uri (GST_URI_HANDLER (self));
g_value_take_string (value, uri_str);
}
break;
case PROP_CAPS:
GST_OBJECT_LOCK (self);
gst_value_set_caps (value, self->caps);
GST_OBJECT_UNLOCK (self);
break;
case PROP_LATENCY:
g_value_set_int (value, self->latency);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_base_src_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
switch (prop_id) {
case PROP_URI:
gst_srt_base_src_uri_set_uri (GST_URI_HANDLER (self),
g_value_get_string (value), NULL);
break;
case PROP_CAPS:
GST_OBJECT_LOCK (self);
g_clear_pointer (&self->caps, gst_caps_unref);
self->caps = gst_caps_copy (gst_value_get_caps (value));
GST_OBJECT_UNLOCK (self);
break;
case PROP_LATENCY:
self->latency = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_base_src_finalize (GObject * object)
{
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
g_clear_pointer (&self->uri, gst_uri_unref);
g_clear_pointer (&self->caps, gst_caps_unref);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static GstCaps *
gst_srt_base_src_get_caps (GstBaseSrc * src, GstCaps * filter)
{
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (src);
GstCaps *result, *caps = NULL;
GST_OBJECT_LOCK (self);
if (self->caps != NULL) {
caps = gst_caps_ref (self->caps);
}
GST_OBJECT_UNLOCK (self);
if (caps) {
if (filter) {
result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
gst_caps_unref (caps);
} else {
result = caps;
}
} else {
result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
}
return result;
}
static void
gst_srt_base_src_class_init (GstSRTBaseSrcClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
gobject_class->set_property = gst_srt_base_src_set_property;
gobject_class->get_property = gst_srt_base_src_get_property;
gobject_class->finalize = gst_srt_base_src_finalize;
/**
* GstSRTBaseSrc:uri:
*
* The URI used by SRT Connection.
*/
properties[PROP_URI] = g_param_spec_string ("uri", "URI",
"URI in the form of srt://address:port", SRT_DEFAULT_URI,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
/**
* GstSRTBaseSrc:caps:
*
* The Caps used by the source pad.
*/
properties[PROP_CAPS] =
g_param_spec_boxed ("caps", "Caps", "The caps of the source pad",
GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_LATENCY] =
g_param_spec_int ("latency", "latency",
"Minimum latency (milliseconds)", 0,
G_MAXINT32, SRT_DEFAULT_LATENCY,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_srt_base_src_get_caps);
}
static void
gst_srt_base_src_init (GstSRTBaseSrc * self)
{
gst_srt_base_src_uri_set_uri (GST_URI_HANDLER (self), SRT_DEFAULT_URI, NULL);
gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
self->latency = SRT_DEFAULT_LATENCY;
}
static GstURIType
gst_srt_base_src_uri_get_type (GType type)
{
return GST_URI_SRC;
}
static const gchar *const *
gst_srt_base_src_uri_get_protocols (GType type)
{
static const gchar *protocols[] = { SRT_URI_SCHEME, NULL };
return protocols;
}
static gchar *
gst_srt_base_src_uri_get_uri (GstURIHandler * handler)
{
gchar *uri_str;
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (handler);
GST_OBJECT_LOCK (self);
uri_str = gst_uri_to_string (self->uri);
GST_OBJECT_UNLOCK (self);
return uri_str;
}
static gboolean
gst_srt_base_src_uri_set_uri (GstURIHandler * handler,
const gchar * uri, GError ** error)
{
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (handler);
gboolean ret = TRUE;
GstUri *parsed_uri = gst_uri_from_string (uri);
if (g_strcmp0 (gst_uri_get_scheme (parsed_uri), SRT_URI_SCHEME) != 0) {
g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Invalid SRT URI scheme");
ret = FALSE;
goto out;
}
GST_OBJECT_LOCK (self);
g_clear_pointer (&self->uri, gst_uri_unref);
self->uri = gst_uri_ref (parsed_uri);
GST_OBJECT_UNLOCK (self);
out:
g_clear_pointer (&parsed_uri, gst_uri_unref);
return ret;
}
static void
gst_srt_base_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
iface->get_type = gst_srt_base_src_uri_get_type;
iface->get_protocols = gst_srt_base_src_uri_get_protocols;
iface->get_uri = gst_srt_base_src_uri_get_uri;
iface->set_uri = gst_srt_base_src_uri_set_uri;
}

63
ext/srt/gstsrtbasesrc.h Normal file
View file

@ -0,0 +1,63 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_BASE_SRC_H__
#define __GST_SRT_BASE_SRC_H__
#include <gst/gst.h>
#include <gst/base/gstpushsrc.h>
G_BEGIN_DECLS
#define GST_TYPE_SRT_BASE_SRC (gst_srt_base_src_get_type ())
#define GST_IS_SRT_BASE_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_BASE_SRC))
#define GST_IS_SRT_BASE_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_BASE_SRC))
#define GST_SRT_BASE_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrcClass))
#define GST_SRT_BASE_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrc))
#define GST_SRT_BASE_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrcClass))
#define GST_SRT_BASE_SRC_CAST(obj) ((GstSRTBaseSrc*)(obj))
#define GST_SRT_BASE_SRC_CLASS_CAST(klass) ((GstSRTBaseSrcClass*)(klass))
typedef struct _GstSRTBaseSrc GstSRTBaseSrc;
typedef struct _GstSRTBaseSrcClass GstSRTBaseSrcClass;
struct _GstSRTBaseSrc {
GstPushSrc parent;
GstUri *uri;
GstCaps *caps;
gint latency;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
struct _GstSRTBaseSrcClass {
GstPushSrcClass parent_class;
gpointer _gst_reserved[GST_PADDING_LARGE];
};
GST_EXPORT
GType gst_srt_base_src_get_type (void);
G_END_DECLS
#endif /* __GST_SRT_BASE_SRC_H__ */

268
ext/srt/gstsrtclientsink.c Normal file
View file

@ -0,0 +1,268 @@
/* GStreamer SRT plugin based on libsrt
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-srtserversink
* @title: srtserversink
*
* srtserversink is a network sink that sends <ulink url="http://www.srtalliance.org/">SRT</ulink>
* packets to the network. Although SRT is an UDP-based protocol, srtserversink works like
* a server socket of connection-oriented protocol.
*
* <refsect2>
* <title>Examples</title>
* |[
* gst-launch-1.0 -v audiotestsrc ! srtserversink
* ]| This pipeline shows how to serve SRT packets through the default port.
* |[
* gst-launch-1.0 -v audiotestsrc ! srtserversink uri=srt://192.168.1.10:8888/ rendez-vous=1
* ]| This pipeline shows how to serve SRT packets to 192.168.1.10 port 8888 using the rendez-vous mode.
* </refsect2>
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrtclientsink.h"
#include "gstsrt.h"
#include <srt/srt.h>
#include <gio/gio.h>
#define SRT_DEFAULT_POLL_TIMEOUT -1
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define GST_CAT_DEFAULT gst_debug_srt_client_sink
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
struct _GstSRTClientSinkPrivate
{
SRTSOCKET sock;
GSocketAddress *sockaddr;
gint poll_id;
gint poll_timeout;
gboolean rendez_vous;
gchar *bind_address;
guint16 bind_port;
};
#define GST_SRT_CLIENT_SINK_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkPrivate))
enum
{
PROP_POLL_TIMEOUT = 1,
PROP_BIND_ADDRESS,
PROP_BIND_PORT,
PROP_RENDEZ_VOUS,
PROP_STATS,
/*< private > */
PROP_LAST
};
static GParamSpec *properties[PROP_LAST];
#define gst_srt_client_sink_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSRTClientSink, gst_srt_client_sink,
GST_TYPE_SRT_BASE_SINK, G_ADD_PRIVATE (GstSRTClientSink)
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtclientsink", 0,
"SRT Client Sink"));
static void
gst_srt_client_sink_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (object);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
g_value_set_int (value, priv->poll_timeout);
break;
case PROP_BIND_PORT:
g_value_set_int (value, priv->rendez_vous);
break;
case PROP_BIND_ADDRESS:
g_value_set_string (value, priv->bind_address);
break;
case PROP_RENDEZ_VOUS:
g_value_set_boolean (value, priv->bind_port);
break;
case PROP_STATS:
g_value_take_boxed (value, gst_srt_base_sink_get_stats (priv->sockaddr,
priv->sock));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_client_sink_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (object);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
priv->poll_timeout = g_value_get_int (value);
break;
case PROP_BIND_ADDRESS:
g_free (priv->bind_address);
priv->bind_address = g_value_dup_string (value);
break;
case PROP_BIND_PORT:
priv->bind_port = g_value_get_int (value);
break;
case PROP_RENDEZ_VOUS:
priv->rendez_vous = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
gst_srt_client_sink_start (GstBaseSink * sink)
{
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
GstSRTBaseSink *base = GST_SRT_BASE_SINK (sink);
GstUri *uri = gst_uri_ref (GST_SRT_BASE_SINK (self)->uri);
priv->sock = gst_srt_client_connect (GST_ELEMENT (sink), FALSE,
gst_uri_get_host (uri), gst_uri_get_port (uri), priv->rendez_vous,
priv->bind_address, priv->bind_port, base->latency,
&priv->sockaddr, &priv->poll_id);
g_clear_pointer (&uri, gst_uri_unref);
return (priv->sock != SRT_INVALID_SOCK);
}
static gboolean
gst_srt_client_sink_send_buffer (GstSRTBaseSink * sink,
const GstMapInfo * mapinfo)
{
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
if (srt_sendmsg2 (priv->sock, (char *) mapinfo->data, mapinfo->size,
0) == SRT_ERROR) {
GST_ELEMENT_ERROR (self, RESOURCE, WRITE, NULL,
("%s", srt_getlasterror_str ()));
return FALSE;
}
return TRUE;
}
static gboolean
gst_srt_client_sink_stop (GstBaseSink * sink)
{
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
GST_DEBUG_OBJECT (self, "closing SRT connection");
if (priv->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (priv->poll_id, priv->sock);
srt_epoll_release (priv->poll_id);
priv->poll_id = SRT_ERROR;
}
if (priv->sock != SRT_INVALID_SOCK) {
srt_close (priv->sock);
priv->sock = SRT_INVALID_SOCK;
}
g_clear_object (&priv->sockaddr);
return TRUE;
}
static void
gst_srt_client_sink_class_init (GstSRTClientSinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
GstSRTBaseSinkClass *gstsrtbasesink_class = GST_SRT_BASE_SINK_CLASS (klass);
gobject_class->set_property = gst_srt_client_sink_set_property;
gobject_class->get_property = gst_srt_client_sink_get_property;
properties[PROP_POLL_TIMEOUT] =
g_param_spec_int ("poll-timeout", "Poll Timeout",
"Return poll wait after timeout miliseconds (-1 = infinite)", -1,
G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_BIND_ADDRESS] =
g_param_spec_string ("bind-address", "Bind Address",
"Address to bind socket to (required for rendez-vous mode) ", NULL,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
properties[PROP_BIND_PORT] =
g_param_spec_int ("bind-port", "Bind Port",
"Port to bind socket to (Ignored in rendez-vous mode)", 0,
G_MAXUINT16, 0,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
properties[PROP_RENDEZ_VOUS] =
g_param_spec_boolean ("rendez-vous", "Rendez Vous",
"Work in Rendez-Vous mode instead of client/caller mode", FALSE,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
properties[PROP_STATS] = g_param_spec_boxed ("stats", "Statistics",
"SRT Statistics", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
gst_element_class_set_metadata (gstelement_class,
"SRT client sink", "Sink/Network",
"Send data over the network via SRT",
"Justin Kim <justin.kim@collabora.com>");
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_srt_client_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_client_sink_stop);
gstsrtbasesink_class->send_buffer =
GST_DEBUG_FUNCPTR (gst_srt_client_sink_send_buffer);
}
static void
gst_srt_client_sink_init (GstSRTClientSink * self)
{
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
}

View file

@ -0,0 +1,59 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_CLIENT_SINK_H__
#define __GST_SRT_CLIENT_SINK_H__
#include "gstsrtbasesink.h"
G_BEGIN_DECLS
#define GST_TYPE_SRT_CLIENT_SINK (gst_srt_client_sink_get_type ())
#define GST_IS_SRT_CLIENT_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_CLIENT_SINK))
#define GST_IS_SRT_CLIENT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_CLIENT_SINK))
#define GST_SRT_CLIENT_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkClass))
#define GST_SRT_CLIENT_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSink))
#define GST_SRT_CLIENT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkClass))
#define GST_SRT_CLIENT_SINK_CAST(obj) ((GstSRTClientSink*)(obj))
#define GST_SRT_CLIENT_SINK_CLASS_CAST(klass) ((GstSRTClientSinkClass*)(klass))
typedef struct _GstSRTClientSink GstSRTClientSink;
typedef struct _GstSRTClientSinkClass GstSRTClientSinkClass;
typedef struct _GstSRTClientSinkPrivate GstSRTClientSinkPrivate;
struct _GstSRTClientSink {
GstSRTBaseSink parent;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
struct _GstSRTClientSinkClass {
GstSRTBaseSinkClass parent_class;
gpointer _gst_reserved[GST_PADDING_LARGE];
};
GST_EXPORT
GType gst_srt_client_sink_get_type (void);
G_END_DECLS
#endif /* __GST_SRT_CLIENT_SINK_H__ */

339
ext/srt/gstsrtclientsrc.c Normal file
View file

@ -0,0 +1,339 @@
/* GStreamer SRT plugin based on libsrt
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-srtclientsrc
* @title: srtclientsrc
*
* srtclientsrc is a network source that reads <ulink url="http://www.srtalliance.org/">SRT</ulink>
* packets from the network. Although SRT is a protocol based on UDP, srtclientsrc works like
* a client socket of connection-oriented protocol.
*
* <refsect2>
* <title>Examples</title>
* |[
* gst-launch-1.0 -v srtclientsrc uri="srt://127.0.0.1:7001" ! fakesink
* ]| This pipeline shows how to connect SRT server by setting #GstSRTClientSrc:uri property.
*
* |[
* gst-launch-1.0 -v srtclientsrc uri="srt://192.168.1.10:7001" rendez-vous ! fakesink
* ]| This pipeline shows how to connect SRT server by setting #GstSRTClientSrc:uri property and using the rendez-vous mode.
* </refsect2>
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrtclientsrc.h"
#include <srt/srt.h>
#include <gio/gio.h>
#include "gstsrt.h"
#include <netinet/in.h>
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define GST_CAT_DEFAULT gst_debug_srt_client_src
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
struct _GstSRTClientSrcPrivate
{
SRTSOCKET sock;
gint poll_id;
gint poll_timeout;
gboolean rendez_vous;
gchar *bind_address;
guint16 bind_port;
};
#define GST_SRT_CLIENT_SRC_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcPrivate))
#define SRT_DEFAULT_POLL_TIMEOUT -1
enum
{
PROP_POLL_TIMEOUT = 1,
PROP_BIND_ADDRESS,
PROP_BIND_PORT,
PROP_RENDEZ_VOUS,
/*< private > */
PROP_LAST
};
static GParamSpec *properties[PROP_LAST + 1];
#define gst_srt_client_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSRTClientSrc, gst_srt_client_src,
GST_TYPE_SRT_BASE_SRC, G_ADD_PRIVATE (GstSRTClientSrc)
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtclientsrc", 0,
"SRT Client Source"));
static void
gst_srt_client_src_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (object);
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
g_value_set_int (value, priv->poll_timeout);
break;
case PROP_BIND_PORT:
g_value_set_int (value, priv->rendez_vous);
break;
case PROP_BIND_ADDRESS:
g_value_set_string (value, priv->bind_address);
break;
case PROP_RENDEZ_VOUS:
g_value_set_boolean (value, priv->bind_port);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_client_src_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object);
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
priv->poll_timeout = g_value_get_int (value);
break;
case PROP_BIND_ADDRESS:
g_free (priv->bind_address);
priv->bind_address = g_value_dup_string (value);
break;
case PROP_BIND_PORT:
priv->bind_port = g_value_get_int (value);
break;
case PROP_RENDEZ_VOUS:
priv->rendez_vous = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_client_src_finalize (GObject * object)
{
GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (object);
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
if (priv->poll_id != SRT_ERROR) {
srt_epoll_release (priv->poll_id);
priv->poll_id = SRT_ERROR;
}
if (priv->sock != SRT_INVALID_SOCK) {
srt_close (priv->sock);
priv->sock = SRT_INVALID_SOCK;
}
g_free (priv->bind_address);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static GstFlowReturn
gst_srt_client_src_fill (GstPushSrc * src, GstBuffer * outbuf)
{
GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src);
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
GstFlowReturn ret = GST_FLOW_OK;
GstMapInfo info;
SRTSOCKET ready[2];
gint recv_len;
if (srt_epoll_wait (priv->poll_id, 0, 0, ready, &(int) {
2}, priv->poll_timeout, 0, 0, 0, 0) == -1) {
/* Assuming that timeout error is normal */
if (srt_getlasterror (NULL) != SRT_ETIMEOUT) {
GST_ELEMENT_ERROR (src, RESOURCE, READ,
(NULL), ("srt_epoll_wait error: %s", srt_getlasterror_str ()));
ret = GST_FLOW_ERROR;
}
srt_clearlasterror ();
goto out;
}
if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) {
GST_ELEMENT_ERROR (src, RESOURCE, READ,
("Could not map the buffer for writing "), (NULL));
ret = GST_FLOW_ERROR;
goto out;
}
recv_len = srt_recvmsg (priv->sock, (char *) info.data,
gst_buffer_get_size (outbuf));
gst_buffer_unmap (outbuf, &info);
if (recv_len == SRT_ERROR) {
GST_ELEMENT_ERROR (src, RESOURCE, READ,
(NULL), ("srt_recvmsg error: %s", srt_getlasterror_str ()));
ret = GST_FLOW_ERROR;
goto out;
} else if (recv_len == 0) {
ret = GST_FLOW_EOS;
goto out;
}
GST_BUFFER_PTS (outbuf) =
gst_clock_get_time (GST_ELEMENT_CLOCK (src)) -
GST_ELEMENT_CAST (src)->base_time;
gst_buffer_resize (outbuf, 0, recv_len);
GST_LOG_OBJECT (src,
"filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
gst_buffer_get_size (outbuf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
out:
return ret;
}
static gboolean
gst_srt_client_src_start (GstBaseSrc * src)
{
GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src);
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
GstSRTBaseSrc *base = GST_SRT_BASE_SRC (src);
GstUri *uri = gst_uri_ref (base->uri);
GSocketAddress *socket_address = NULL;
priv->sock = gst_srt_client_connect (GST_ELEMENT (src), FALSE,
gst_uri_get_host (uri), gst_uri_get_port (uri), priv->rendez_vous,
priv->bind_address, priv->bind_port, base->latency,
&socket_address, &priv->poll_id);
g_clear_object (&socket_address);
g_clear_pointer (&uri, gst_uri_unref);
return (priv->sock != SRT_INVALID_SOCK);
}
static gboolean
gst_srt_client_src_stop (GstBaseSrc * src)
{
GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src);
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
if (priv->poll_id != SRT_ERROR) {
if (priv->sock != SRT_INVALID_SOCK)
srt_epoll_remove_usock (priv->poll_id, priv->sock);
srt_epoll_release (priv->poll_id);
}
priv->poll_id = SRT_ERROR;
GST_DEBUG_OBJECT (self, "closing SRT connection");
if (priv->sock != SRT_INVALID_SOCK)
srt_close (priv->sock);
priv->sock = SRT_INVALID_SOCK;
return TRUE;
}
static void
gst_srt_client_src_class_init (GstSRTClientSrcClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
gobject_class->set_property = gst_srt_client_src_set_property;
gobject_class->get_property = gst_srt_client_src_get_property;
gobject_class->finalize = gst_srt_client_src_finalize;
/**
* GstSRTClientSrc:poll-timeout:
*
* The timeout(ms) value when polling SRT socket.
*/
properties[PROP_POLL_TIMEOUT] =
g_param_spec_int ("poll-timeout", "Poll timeout",
"Return poll wait after timeout miliseconds (-1 = infinite)", -1,
G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
properties[PROP_BIND_ADDRESS] =
g_param_spec_string ("bind-address", "Bind Address",
"Address to bind socket to (required for rendez-vous mode) ", NULL,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
properties[PROP_BIND_PORT] =
g_param_spec_int ("bind-port", "Bind Port",
"Port to bind socket to (Ignored in rendez-vous mode)", 0,
G_MAXUINT16, 0,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
properties[PROP_RENDEZ_VOUS] =
g_param_spec_boolean ("rendez-vous", "Rendez Vous",
"Work in Rendez-Vous mode instead of client/caller mode", FALSE,
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gst_element_class_add_static_pad_template (gstelement_class, &src_template);
gst_element_class_set_metadata (gstelement_class,
"SRT client source", "Source/Network",
"Receive data over the network via SRT",
"Justin Kim <justin.kim@collabora.com>");
gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_client_src_start);
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_client_src_stop);
gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_client_src_fill);
}
static void
gst_srt_client_src_init (GstSRTClientSrc * self)
{
GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self);
priv->sock = SRT_INVALID_SOCK;
priv->poll_id = SRT_ERROR;
priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
priv->rendez_vous = FALSE;
priv->bind_address = NULL;
priv->bind_port = 0;
}

59
ext/srt/gstsrtclientsrc.h Normal file
View file

@ -0,0 +1,59 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_CLIENT_SRC_H__
#define __GST_SRT_CLIENT_SRC_H__
#include "gstsrtbasesrc.h"
G_BEGIN_DECLS
#define GST_TYPE_SRT_CLIENT_SRC (gst_srt_client_src_get_type ())
#define GST_IS_SRT_CLIENT_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_CLIENT_SRC))
#define GST_IS_SRT_CLIENT_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_CLIENT_SRC))
#define GST_SRT_CLIENT_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcClass))
#define GST_SRT_CLIENT_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrc))
#define GST_SRT_CLIENT_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcClass))
#define GST_SRT_CLIENT_SRC_CAST(obj) ((GstSRTClientSrc*)(obj))
#define GST_SRT_CLIENT_SRC_CLASS_CAST(klass) ((GstSRTClientSrcClass*)(klass))
typedef struct _GstSRTClientSrc GstSRTClientSrc;
typedef struct _GstSRTClientSrcClass GstSRTClientSrcClass;
typedef struct _GstSRTClientSrcPrivate GstSRTClientSrcPrivate;
struct _GstSRTClientSrc {
GstSRTBaseSrc parent;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
struct _GstSRTClientSrcClass {
GstSRTBaseSrcClass parent_class;
gpointer _gst_reserved[GST_PADDING_LARGE];
};
GST_EXPORT
GType gst_srt_client_src_get_type (void);
G_END_DECLS
#endif /* __GST_SRT_CLIENT_SRC_H__ */

555
ext/srt/gstsrtserversink.c Normal file
View file

@ -0,0 +1,555 @@
/* GStreamer SRT plugin based on libsrt
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-srtserversink
* @title: srtserversink
*
* srtserversink is a network sink that sends <ulink url="http://www.srtalliance.org/">SRT</ulink>
* packets to the network. Although SRT is an UDP-based protocol, srtserversink works like
* a server socket of connection-oriented protocol.
*
* <refsect2>
* <title>Examples</title>
* |[
* gst-launch-1.0 -v audiotestsrc ! srtserversink
* ]| This pipeline shows how to serve SRT packets through the default port.
* </refsect2>
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrtserversink.h"
#include "gstsrt.h"
#include <srt/srt.h>
#include <gio/gio.h>
#define SRT_DEFAULT_POLL_TIMEOUT -1
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define GST_CAT_DEFAULT gst_debug_srt_server_sink
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
struct _GstSRTServerSinkPrivate
{
gboolean cancelled;
SRTSOCKET sock;
gint poll_id;
gint poll_timeout;
GMainLoop *loop;
GMainContext *context;
GSource *server_source;
GThread *thread;
GList *clients;
};
#define GST_SRT_SERVER_SINK_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkPrivate))
enum
{
PROP_POLL_TIMEOUT = 1,
PROP_STATS,
/*< private > */
PROP_LAST
};
static GParamSpec *properties[PROP_LAST];
enum
{
SIG_CLIENT_ADDED,
SIG_CLIENT_REMOVED,
LAST_SIGNAL
};
static guint signals[LAST_SIGNAL] = { 0 };
#define gst_srt_server_sink_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSRTServerSink, gst_srt_server_sink,
GST_TYPE_SRT_BASE_SINK, G_ADD_PRIVATE (GstSRTServerSink)
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtserversink", 0,
"SRT Server Sink"));
typedef struct
{
int sock;
GSocketAddress *sockaddr;
} SRTClient;
static SRTClient *
srt_client_new (void)
{
SRTClient *client = g_new0 (SRTClient, 1);
client->sock = SRT_INVALID_SOCK;
return client;
}
static void
srt_client_free (SRTClient * client)
{
g_return_if_fail (client != NULL);
g_clear_object (&client->sockaddr);
if (client->sock != SRT_INVALID_SOCK) {
srt_close (client->sock);
}
g_free (client);
}
static void
srt_emit_client_removed (SRTClient * client, gpointer user_data)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (user_data);
g_return_if_fail (client != NULL && GST_IS_SRT_SERVER_SINK (self));
g_signal_emit (self, signals[SIG_CLIENT_REMOVED], 0, client->sock,
client->sockaddr);
}
static void
gst_srt_server_sink_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (object);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
g_value_set_int (value, priv->poll_timeout);
break;
case PROP_STATS:
{
GList *item;
GST_OBJECT_LOCK (self);
for (item = priv->clients; item; item = item->next) {
SRTClient *client = item->data;
GValue tmp = G_VALUE_INIT;
g_value_init (&tmp, GST_TYPE_STRUCTURE);
g_value_take_boxed (&tmp, gst_srt_base_sink_get_stats (client->sockaddr,
client->sock));
gst_value_array_append_and_take_value (value, &tmp);
}
GST_OBJECT_UNLOCK (self);
break;
}
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_server_sink_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (object);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
priv->poll_timeout = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
idle_listen_callback (gpointer data)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (data);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
gboolean ret = TRUE;
SRTClient *client;
SRTSOCKET ready[2];
struct sockaddr sa;
int sa_len;
if (srt_epoll_wait (priv->poll_id, ready, &(int) {
2}, 0, 0, priv->poll_timeout, 0, 0, 0, 0) == -1) {
int srt_errno = srt_getlasterror (NULL);
if (srt_errno != SRT_ETIMEOUT) {
GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
("SRT error: %s", srt_getlasterror_str ()), (NULL));
ret = FALSE;
goto out;
}
/* Mimicking cancellable */
if (srt_errno == SRT_ETIMEOUT && priv->cancelled) {
GST_DEBUG_OBJECT (self, "Cancelled waiting for client");
ret = FALSE;
goto out;
}
}
client = srt_client_new ();
client->sock = srt_accept (priv->sock, &sa, &sa_len);
if (client->sock == SRT_INVALID_SOCK) {
GST_WARNING_OBJECT (self, "detected invalid SRT client socket (reason: %s)",
srt_getlasterror_str ());
srt_clearlasterror ();
srt_client_free (client);
ret = FALSE;
goto out;
}
client->sockaddr = g_socket_address_new_from_native (&sa, sa_len);
GST_OBJECT_LOCK (self);
priv->clients = g_list_append (priv->clients, client);
GST_OBJECT_UNLOCK (self);
g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0, client->sock,
client->sockaddr);
GST_DEBUG_OBJECT (self, "client added");
out:
return ret;
}
static gpointer
thread_func (gpointer data)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (data);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
g_main_loop_run (priv->loop);
return NULL;
}
static gboolean
gst_srt_server_sink_start (GstBaseSink * sink)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
GstSRTBaseSink *base = GST_SRT_BASE_SINK (sink);
GstUri *uri = gst_uri_ref (GST_SRT_BASE_SINK (self)->uri);
GSocketAddress *socket_address = NULL;
GError *error = NULL;
gboolean ret = TRUE;
struct sockaddr sa;
size_t sa_len;
const gchar *host;
int lat = base->latency;
if (gst_uri_get_port (uri) == GST_URI_NO_PORT) {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, NULL, (("Invalid port")));
return FALSE;
}
host = gst_uri_get_host (uri);
if (host == NULL) {
GInetAddress *any = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
socket_address = g_inet_socket_address_new (any, gst_uri_get_port (uri));
g_object_unref (any);
} else {
socket_address =
g_inet_socket_address_new_from_string (host, gst_uri_get_port (uri));
}
if (socket_address == NULL) {
GST_WARNING_OBJECT (self,
"failed to extract host or port from the given URI");
goto failed;
}
sa_len = g_socket_address_get_native_size (socket_address);
if (!g_socket_address_to_native (socket_address, &sa, sa_len, &error)) {
GST_WARNING_OBJECT (self, "cannot resolve address (reason: %s)",
error->message);
goto failed;
}
priv->sock = srt_socket (sa.sa_family, SOCK_DGRAM, 0);
if (priv->sock == SRT_INVALID_SOCK) {
GST_WARNING_OBJECT (self, "failed to create SRT socket (reason: %s)",
srt_getlasterror_str ());
goto failed;
}
/* Make SRT non-blocking */
srt_setsockopt (priv->sock, 0, SRTO_SNDSYN, &(int) {
0}, sizeof (int));
/* Make sure TSBPD mode is enable (SRT mode) */
srt_setsockopt (priv->sock, 0, SRTO_TSBPDMODE, &(int) {
1}, sizeof (int));
/* This is a sink, we're always a sender */
srt_setsockopt (priv->sock, 0, SRTO_SENDER, &(int) {
1}, sizeof (int));
srt_setsockopt (priv->sock, 0, SRTO_TSBPDDELAY, &lat, sizeof (int));
priv->poll_id = srt_epoll_create ();
if (priv->poll_id == -1) {
GST_WARNING_OBJECT (self,
"failed to create poll id for SRT socket (reason: %s)",
srt_getlasterror_str ());
goto failed;
}
srt_epoll_add_usock (priv->poll_id, priv->sock, &(int) {
SRT_EPOLL_IN});
if (srt_bind (priv->sock, &sa, sa_len) == SRT_ERROR) {
GST_WARNING_OBJECT (self, "failed to bind SRT server socket (reason: %s)",
srt_getlasterror_str ());
goto failed;
}
if (srt_listen (priv->sock, 1) == SRT_ERROR) {
GST_WARNING_OBJECT (self, "failed to listen SRT socket (reason: %s)",
srt_getlasterror_str ());
goto failed;
}
priv->context = g_main_context_new ();
priv->server_source = g_idle_source_new ();
g_source_set_callback (priv->server_source,
(GSourceFunc) idle_listen_callback, gst_object_ref (self),
(GDestroyNotify) gst_object_unref);
g_source_attach (priv->server_source, priv->context);
priv->loop = g_main_loop_new (priv->context, TRUE);
priv->thread = g_thread_try_new ("srtserversink", thread_func, self, &error);
if (error != NULL) {
GST_WARNING_OBJECT (self, "failed to create thread (reason: %s)",
error->message);
ret = FALSE;
}
g_clear_pointer (&uri, gst_uri_unref);
g_clear_object (&socket_address);
return ret;
failed:
if (priv->poll_id != SRT_ERROR) {
srt_epoll_release (priv->poll_id);
priv->poll_id = SRT_ERROR;
}
if (priv->sock != SRT_INVALID_SOCK) {
srt_close (priv->sock);
priv->sock = SRT_INVALID_SOCK;
}
g_clear_error (&error);
g_clear_pointer (&uri, gst_uri_unref);
g_clear_object (&socket_address);
return FALSE;
}
static gboolean
gst_srt_server_sink_send_buffer (GstSRTBaseSink * sink,
const GstMapInfo * mapinfo)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
GList *clients = priv->clients;
GST_OBJECT_LOCK (sink);
while (clients != NULL) {
SRTClient *client = clients->data;
clients = clients->next;
if (srt_sendmsg2 (client->sock, (char *) mapinfo->data, mapinfo->size,
0) == SRT_ERROR) {
GST_WARNING_OBJECT (self, "%s", srt_getlasterror_str ());
priv->clients = g_list_remove (priv->clients, client);
GST_OBJECT_UNLOCK (sink);
g_signal_emit (self, signals[SIG_CLIENT_REMOVED], 0, client->sock,
client->sockaddr);
srt_client_free (client);
GST_OBJECT_LOCK (sink);
}
}
GST_OBJECT_UNLOCK (sink);
return TRUE;
}
static gboolean
gst_srt_server_sink_stop (GstBaseSink * sink)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
gboolean ret = TRUE;
GList *clients;
GST_DEBUG_OBJECT (self, "closing client sockets");
GST_OBJECT_LOCK (sink);
clients = priv->clients;
priv->clients = NULL;
GST_OBJECT_UNLOCK (sink);
g_list_foreach (clients, (GFunc) srt_emit_client_removed, self);
g_list_free_full (clients, (GDestroyNotify) srt_client_free);
GST_DEBUG_OBJECT (self, "closing SRT connection");
srt_epoll_remove_usock (priv->poll_id, priv->sock);
srt_epoll_release (priv->poll_id);
srt_close (priv->sock);
if (priv->loop) {
g_main_loop_quit (priv->loop);
g_thread_join (priv->thread);
g_clear_pointer (&priv->loop, g_main_loop_unref);
g_clear_pointer (&priv->thread, g_thread_unref);
}
if (priv->server_source) {
g_source_destroy (priv->server_source);
g_clear_pointer (&priv->server_source, g_source_unref);
}
g_clear_pointer (&priv->context, g_main_context_unref);
return ret;
}
static gboolean
gst_srt_server_sink_unlock (GstBaseSink * sink)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
priv->cancelled = TRUE;
return TRUE;
}
static gboolean
gst_srt_server_sink_unlock_stop (GstBaseSink * sink)
{
GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink);
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
priv->cancelled = FALSE;
return TRUE;
}
static void
gst_srt_server_sink_class_init (GstSRTServerSinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
GstSRTBaseSinkClass *gstsrtbasesink_class = GST_SRT_BASE_SINK_CLASS (klass);
gobject_class->set_property = gst_srt_server_sink_set_property;
gobject_class->get_property = gst_srt_server_sink_get_property;
properties[PROP_POLL_TIMEOUT] =
g_param_spec_int ("poll-timeout", "Poll Timeout",
"Return poll wait after timeout miliseconds (-1 = infinite)", -1,
G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_STATS] = gst_param_spec_array ("stats", "Statistics",
"Array of GstStructures containing SRT statistics",
g_param_spec_boxed ("stats", "Statistics",
"Statistics for one client", GST_TYPE_STRUCTURE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS),
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
/**
* GstSRTServerSink::client-added:
* @gstsrtserversink: the srtserversink element that emitted this signal
* @sock: the client socket descriptor that was added to srtserversink
* @addr: the pointer of "struct sockaddr" that describes the @sock
* @addr_len: the length of @addr
*
* The given socket descriptor was added to srtserversink.
*/
signals[SIG_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSinkClass, client_added),
NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
/**
* GstSRTServerSink::client-removed:
* @gstsrtserversink: the srtserversink element that emitted this signal
* @sock: the client socket descriptor that was added to srtserversink
* @addr: the pointer of "struct sockaddr" that describes the @sock
* @addr_len: the length of @addr
*
* The given socket descriptor was removed from srtserversink.
*/
signals[SIG_CLIENT_REMOVED] =
g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSinkClass,
client_removed), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
gst_element_class_set_metadata (gstelement_class,
"SRT server sink", "Sink/Network",
"Send data over the network via SRT",
"Justin Kim <justin.kim@collabora.com>");
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_srt_server_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_server_sink_stop);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_server_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_srt_server_sink_unlock_stop);
gstsrtbasesink_class->send_buffer =
GST_DEBUG_FUNCPTR (gst_srt_server_sink_send_buffer);
}
static void
gst_srt_server_sink_init (GstSRTServerSink * self)
{
GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self);
priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
}

View file

@ -0,0 +1,63 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_SERVER_SINK_H__
#define __GST_SRT_SERVER_SINK_H__
#include "gstsrtbasesink.h"
#include <sys/socket.h>
G_BEGIN_DECLS
#define GST_TYPE_SRT_SERVER_SINK (gst_srt_server_sink_get_type ())
#define GST_IS_SRT_SERVER_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_SERVER_SINK))
#define GST_IS_SRT_SERVER_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_SERVER_SINK))
#define GST_SRT_SERVER_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkClass))
#define GST_SRT_SERVER_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSink))
#define GST_SRT_SERVER_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkClass))
#define GST_SRT_SERVER_SINK_CAST(obj) ((GstSRTServerSink*)(obj))
#define GST_SRT_SERVER_SINK_CLASS_CAST(klass) ((GstSRTServerSinkClass*)(klass))
typedef struct _GstSRTServerSink GstSRTServerSink;
typedef struct _GstSRTServerSinkClass GstSRTServerSinkClass;
typedef struct _GstSRTServerSinkPrivate GstSRTServerSinkPrivate;
struct _GstSRTServerSink {
GstSRTBaseSink parent;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
struct _GstSRTServerSinkClass {
GstSRTBaseSinkClass parent_class;
void (*client_added) (GstSRTServerSink *self, int sock, struct sockaddr *addr, int addr_len);
void (*client_removed) (GstSRTServerSink *self, int sock, struct sockaddr *addr, int addr_len);
gpointer _gst_reserved[GST_PADDING_LARGE];
};
GST_EXPORT
GType gst_srt_server_sink_get_type (void);
G_END_DECLS
#endif /* __GST_SRT_SERVER_SINK_H__ */

504
ext/srt/gstsrtserversrc.c Normal file
View file

@ -0,0 +1,504 @@
/* GStreamer SRT plugin based on libsrt
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-srtserversrc
* @title: srtserversrc
*
* srtserversrc is a network source that reads <ulink url="http://www.srtalliance.org/">SRT</ulink>
* packets from the network. Although SRT is a protocol based on UDP, srtserversrc works like
* a server socket of connection-oriented protocol, but it accepts to only one client connection.
*
* <refsect2>
* <title>Examples</title>
* |[
* gst-launch-1.0 -v srtserversrc uri="srt://:7001" ! fakesink
* ]| This pipeline shows how to bind SRT server by setting #GstSRTServerSrc:uri property.
* </refsect2>
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsrtserversrc.h"
#include "gstsrt.h"
#include <srt/srt.h>
#include <gio/gio.h>
#define SRT_DEFAULT_POLL_TIMEOUT 100
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define GST_CAT_DEFAULT gst_debug_srt_server_src
GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
struct _GstSRTServerSrcPrivate
{
SRTSOCKET sock;
SRTSOCKET client_sock;
GSocketAddress *client_sockaddr;
gint poll_id;
gint poll_timeout;
gboolean has_client;
gboolean cancelled;
};
#define GST_SRT_SERVER_SRC_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcPrivate))
enum
{
PROP_POLL_TIMEOUT = 1,
/*< private > */
PROP_LAST
};
static GParamSpec *properties[PROP_LAST];
enum
{
SIG_CLIENT_ADDED,
SIG_CLIENT_CLOSED,
LAST_SIGNAL
};
static guint signals[LAST_SIGNAL] = { 0 };
#define gst_srt_server_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSRTServerSrc, gst_srt_server_src,
GST_TYPE_SRT_BASE_SRC, G_ADD_PRIVATE (GstSRTServerSrc)
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtserversrc", 0,
"SRT Server Source"));
static void
gst_srt_server_src_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
g_value_set_int (value, priv->poll_timeout);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_server_src_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
switch (prop_id) {
case PROP_POLL_TIMEOUT:
priv->poll_timeout = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_srt_server_src_finalize (GObject * object)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
if (priv->poll_id != SRT_ERROR) {
srt_epoll_release (priv->poll_id);
priv->poll_id = SRT_ERROR;
}
if (priv->sock != SRT_ERROR) {
srt_close (priv->sock);
priv->sock = SRT_ERROR;
}
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static GstFlowReturn
gst_srt_server_src_fill (GstPushSrc * src, GstBuffer * outbuf)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
GstFlowReturn ret = GST_FLOW_OK;
GstMapInfo info;
SRTSOCKET ready[2];
gint recv_len;
struct sockaddr client_sa;
size_t client_sa_len;
while (!priv->has_client) {
GST_DEBUG_OBJECT (self, "poll wait (timeout: %d)", priv->poll_timeout);
/* Make SRT server socket non-blocking */
srt_setsockopt (priv->sock, 0, SRTO_SNDSYN, &(int) {
0}, sizeof (int));
if (srt_epoll_wait (priv->poll_id, ready, &(int) {
2}, 0, 0, priv->poll_timeout, 0, 0, 0, 0) == -1) {
int srt_errno = srt_getlasterror (NULL);
/* Assuming that timeout error is normal */
if (srt_errno != SRT_ETIMEOUT) {
GST_ELEMENT_ERROR (src, RESOURCE, FAILED,
("SRT error: %s", srt_getlasterror_str ()), (NULL));
return GST_FLOW_ERROR;
}
/* Mimicking cancellable */
if (srt_errno == SRT_ETIMEOUT && priv->cancelled) {
GST_DEBUG_OBJECT (self, "Cancelled waiting for client");
return GST_FLOW_FLUSHING;
}
continue;
}
priv->client_sock =
srt_accept (priv->sock, &client_sa, (int *) &client_sa_len);
GST_DEBUG_OBJECT (self, "checking client sock");
if (priv->client_sock == SRT_INVALID_SOCK) {
GST_WARNING_OBJECT (self,
"detected invalid SRT client socket (reason: %s)",
srt_getlasterror_str ());
srt_clearlasterror ();
} else {
priv->has_client = TRUE;
g_clear_object (&priv->client_sockaddr);
priv->client_sockaddr = g_socket_address_new_from_native (&client_sa,
client_sa_len);
g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0,
priv->client_sock, priv->client_sockaddr);
}
}
GST_DEBUG_OBJECT (self, "filling buffer");
if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) {
GST_ELEMENT_ERROR (src, RESOURCE, WRITE,
("Could not map the output stream"), (NULL));
ret = GST_FLOW_ERROR;
goto out;
}
recv_len = srt_recvmsg (priv->client_sock, (char *) info.data,
gst_buffer_get_size (outbuf));
gst_buffer_unmap (outbuf, &info);
if (recv_len == SRT_ERROR) {
GST_WARNING_OBJECT (self, "%s", srt_getlasterror_str ());
g_signal_emit (self, signals[SIG_CLIENT_CLOSED], 0,
priv->client_sock, priv->client_sockaddr);
srt_close (priv->client_sock);
priv->client_sock = SRT_INVALID_SOCK;
g_clear_object (&priv->client_sockaddr);
priv->has_client = FALSE;
gst_buffer_resize (outbuf, 0, 0);
ret = GST_FLOW_OK;
goto out;
} else if (recv_len == 0) {
ret = GST_FLOW_EOS;
goto out;
}
GST_BUFFER_PTS (outbuf) =
gst_clock_get_time (GST_ELEMENT_CLOCK (src)) -
GST_ELEMENT_CAST (src)->base_time;
gst_buffer_resize (outbuf, 0, recv_len);
GST_LOG_OBJECT (src,
"filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
gst_buffer_get_size (outbuf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
out:
return ret;
}
static gboolean
gst_srt_server_src_start (GstBaseSrc * src)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
GstSRTBaseSrc *base = GST_SRT_BASE_SRC (src);
GstUri *uri = gst_uri_ref (base->uri);
GError *error = NULL;
struct sockaddr sa;
size_t sa_len;
GSocketAddress *socket_address;
const gchar *host;
int lat = base->latency;
if (gst_uri_get_port (uri) == GST_URI_NO_PORT) {
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_WRITE, NULL, (("Invalid port")));
return FALSE;
}
host = gst_uri_get_host (uri);
if (host == NULL) {
GInetAddress *any = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
socket_address = g_inet_socket_address_new (any, gst_uri_get_port (uri));
g_object_unref (any);
} else {
socket_address =
g_inet_socket_address_new_from_string (host, gst_uri_get_port (uri));
}
if (socket_address == NULL) {
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, ("Invalid URI"),
("failed to extract host or port from the given URI"));
goto failed;
}
sa_len = g_socket_address_get_native_size (socket_address);
if (!g_socket_address_to_native (socket_address, &sa, sa_len, &error)) {
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, ("Invalid URI"),
("cannot resolve address (reason: %s)", error->message));
goto failed;
}
priv->sock = srt_socket (sa.sa_family, SOCK_DGRAM, 0);
if (priv->sock == SRT_ERROR) {
GST_ELEMENT_ERROR (self, LIBRARY, INIT, (NULL),
("failed to create poll id for SRT socket (reason: %s)",
srt_getlasterror_str ()));
goto failed;
}
/* Make sure TSBPD mode is enable (SRT mode) */
srt_setsockopt (priv->sock, 0, SRTO_TSBPDMODE, &(int) {
1}, sizeof (int));
/* This is a sink, we're always a receiver */
srt_setsockopt (priv->sock, 0, SRTO_SENDER, &(int) {
0}, sizeof (int));
srt_setsockopt (priv->sock, 0, SRTO_TSBPDDELAY, &lat, sizeof (int));
priv->poll_id = srt_epoll_create ();
if (priv->poll_id == -1) {
GST_ELEMENT_ERROR (self, LIBRARY, INIT, (NULL),
("failed to create poll id for SRT socket (reason: %s)",
srt_getlasterror_str ()));
goto failed;
}
srt_epoll_add_usock (priv->poll_id, priv->sock, &(int) {
SRT_EPOLL_IN});
if (srt_bind (priv->sock, &sa, sa_len) == SRT_ERROR) {
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL),
("failed to bind SRT server socket (reason: %s)",
srt_getlasterror_str ()));
goto failed;
}
if (srt_listen (priv->sock, 1) == SRT_ERROR) {
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL),
("failed to listen SRT socket (reason: %s)", srt_getlasterror_str ()));
goto failed;
}
g_clear_pointer (&uri, gst_uri_unref);
g_clear_object (&socket_address);
return TRUE;
failed:
if (priv->poll_id != SRT_ERROR) {
srt_epoll_release (priv->poll_id);
priv->poll_id = SRT_ERROR;
}
if (priv->sock != SRT_ERROR) {
srt_close (priv->sock);
priv->sock = SRT_ERROR;
}
g_clear_error (&error);
g_clear_pointer (&uri, gst_uri_unref);
g_clear_object (&socket_address);
return FALSE;
}
static gboolean
gst_srt_server_src_stop (GstBaseSrc * src)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
if (priv->client_sock != SRT_INVALID_SOCK) {
g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0,
priv->client_sock, priv->client_sockaddr);
srt_close (priv->client_sock);
g_clear_object (&priv->client_sockaddr);
priv->client_sock = SRT_INVALID_SOCK;
priv->has_client = FALSE;
}
if (priv->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (priv->poll_id, priv->sock);
srt_epoll_release (priv->poll_id);
priv->poll_id = SRT_ERROR;
}
if (priv->sock != SRT_INVALID_SOCK) {
GST_DEBUG_OBJECT (self, "closing SRT connection");
srt_close (priv->sock);
priv->sock = SRT_INVALID_SOCK;
}
priv->cancelled = FALSE;
return TRUE;
}
static gboolean
gst_srt_server_src_unlock (GstBaseSrc * src)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
priv->cancelled = TRUE;
return TRUE;
}
static gboolean
gst_srt_server_src_unlock_stop (GstBaseSrc * src)
{
GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src);
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
priv->cancelled = FALSE;
return TRUE;
}
static void
gst_srt_server_src_class_init (GstSRTServerSrcClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
gobject_class->set_property = gst_srt_server_src_set_property;
gobject_class->get_property = gst_srt_server_src_get_property;
gobject_class->finalize = gst_srt_server_src_finalize;
/**
* GstSRTServerSrc:poll-timeout:
*
* The timeout(ms) value when polling SRT socket. For #GstSRTServerSrc,
* this value shouldn't be set as -1 (infinite) because "srt_epoll_wait"
* isn't cancellable unless closing the socket.
*/
properties[PROP_POLL_TIMEOUT] =
g_param_spec_int ("poll-timeout", "Poll timeout",
"Return poll wait after timeout miliseconds", 0, G_MAXINT32,
SRT_DEFAULT_POLL_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
/**
* GstSRTServerSrc::client-added:
* @gstsrtserversrc: the srtserversrc element that emitted this signal
* @sock: the client socket descriptor that was added to srtserversrc
* @addr: the pointer of "struct sockaddr" that describes the @sock
* @addr_len: the length of @addr
*
* The given socket descriptor was added to srtserversrc.
*/
signals[SIG_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSrcClass, client_added),
NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
/**
* GstSRTServerSrc::client-closed:
* @gstsrtserversrc: the srtserversrc element that emitted this signal
* @sock: the client socket descriptor that was added to srtserversrc
* @addr: the pointer of "struct sockaddr" that describes the @sock
* @addr_len: the length of @addr
*
* The given socket descriptor was closed.
*/
signals[SIG_CLIENT_CLOSED] =
g_signal_new ("client-closed", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSrcClass, client_closed),
NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
gst_element_class_add_static_pad_template (gstelement_class, &src_template);
gst_element_class_set_metadata (gstelement_class,
"SRT Server source", "Source/Network",
"Receive data over the network via SRT",
"Justin Kim <justin.kim@collabora.com>");
gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_server_src_start);
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_server_src_stop);
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_server_src_unlock);
gstbasesrc_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_srt_server_src_unlock_stop);
gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_server_src_fill);
}
static void
gst_srt_server_src_init (GstSRTServerSrc * self)
{
GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self);
priv->sock = SRT_INVALID_SOCK;
priv->client_sock = SRT_INVALID_SOCK;
priv->poll_id = SRT_ERROR;
priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT;
}

63
ext/srt/gstsrtserversrc.h Normal file
View file

@ -0,0 +1,63 @@
/* GStreamer
* Copyright (C) 2017, Collabora Ltd.
* Author:Justin Kim <justin.kim@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_SRT_SERVER_SRC_H__
#define __GST_SRT_SERVER_SRC_H__
#include "gstsrtbasesrc.h"
#include <sys/socket.h>
G_BEGIN_DECLS
#define GST_TYPE_SRT_SERVER_SRC (gst_srt_server_src_get_type ())
#define GST_IS_SRT_SERVER_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_SERVER_SRC))
#define GST_IS_SRT_SERVER_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_SERVER_SRC))
#define GST_SRT_SERVER_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcClass))
#define GST_SRT_SERVER_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrc))
#define GST_SRT_SERVER_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcClass))
#define GST_SRT_SERVER_SRC_CAST(obj) ((GstSRTServerSrc*)(obj))
#define GST_SRT_SERVER_SRC_CLASS_CAST(klass) ((GstSRTServerSrcClass*)(klass))
typedef struct _GstSRTServerSrc GstSRTServerSrc;
typedef struct _GstSRTServerSrcClass GstSRTServerSrcClass;
typedef struct _GstSRTServerSrcPrivate GstSRTServerSrcPrivate;
struct _GstSRTServerSrc {
GstSRTBaseSrc parent;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
struct _GstSRTServerSrcClass {
GstSRTBaseSrcClass parent_class;
void (*client_added) (GstSRTServerSrc *self, int sock, struct sockaddr *addr, int addr_len);
void (*client_closed) (GstSRTServerSrc *self, int sock, struct sockaddr *addr, int addr_len);
gpointer _gst_reserved[GST_PADDING_LARGE];
};
GST_EXPORT
GType gst_srt_server_src_get_type (void);
G_END_DECLS
#endif /* __GST_SRT_SERVER_SRC_H__ */

27
ext/srt/meson.build Normal file
View file

@ -0,0 +1,27 @@
srt_sources = [
'gstsrt.c',
'gstsrtbasesrc.c',
'gstsrtclientsrc.c',
'gstsrtserversrc.c',
'gstsrtbasesink.c',
'gstsrtclientsink.c',
'gstsrtserversink.c',
]
srt_dep = dependency('libsrt', required : false)
if not srt_dep.found() and cc.has_header_symbol('srt/srt.h', 'srt_startup')
srt_dep = cc.find_library('srt', required : false)
endif
if srt_dep.found()
gstsrt = library('gstsrt',
srt_sources,
c_args : gst_plugins_bad_args,
link_args : noseh_link_args,
include_directories : [configinc, libsinc],
dependencies : [gstbase_dep, gio_dep, srt_dep],
install : true,
install_dir : plugins_install_dir,
)
endif