diff --git a/configure.ac b/configure.ac index 3b5c4fbe3e..60afdfb6f2 100644 --- a/configure.ac +++ b/configure.ac @@ -2533,6 +2533,15 @@ AG_GST_CHECK_FEATURE(LIBMMS, [mms protocol library], libmms, [ ]) AC_SUBST(LIBMMS_LIBS) +dnl *** libsrt *** +translit(dnm, m, l) AM_CONDITIONAL(USE_SRT, true) +AG_GST_CHECK_FEATURE(SRT, [srt library], srt, [ + PKG_CHECK_MODULES(SRT, libsrt, HAVE_SRT="yes", + AG_GST_CHECK_LIBHEADER(SRT, srt, srt_startup, , srt/srt.h, SRT_LIBS="-lsrt") + ) + AC_SUBST(SRT_LIBS) + AC_SUBST(SRT_CFLAGS) +]) dnl *** libsrtp *** translit(dnm, m, l) AM_CONDITIONAL(USE_SRTP, true) @@ -3793,6 +3802,7 @@ ext/smoothstreaming/Makefile ext/sndfile/Makefile ext/soundtouch/Makefile ext/spandsp/Makefile +ext/srt/Makefile ext/srtp/Makefile ext/teletextdec/Makefile ext/gme/Makefile diff --git a/ext/Makefile.am b/ext/Makefile.am index aeaa9f1f69..0c35ff53e5 100644 --- a/ext/Makefile.am +++ b/ext/Makefile.am @@ -328,6 +328,12 @@ else SPC_DIR= endif +if USE_SRT +SRT_DIR=srt +else +SRT_DIR= +endif + if USE_SRTP SRTP_DIR=srtp else @@ -457,6 +463,7 @@ SUBDIRS=\ $(SPANDSP_DIR) \ $(GME_DIR) \ $(SPC_DIR) \ + $(SRT_DIR) \ $(SRTP_DIR) \ $(TELETEXTDEC_DIR) \ $(WILDMIDI_DIR) \ @@ -522,6 +529,7 @@ DIST_SUBDIRS = \ soundtouch \ spandsp \ spc \ + srt \ srtp \ gme \ teletextdec \ diff --git a/ext/meson.build b/ext/meson.build index 2f3eeb6f08..fbe3513d22 100644 --- a/ext/meson.build +++ b/ext/meson.build @@ -54,6 +54,7 @@ if cc.get_id() != 'msvc' subdir('spandsp') endif #subdir('spc') +subdir('srt') subdir('srtp') #subdir('teletextdec') #subdir('wildmidi') diff --git a/ext/srt/Makefile.am b/ext/srt/Makefile.am new file mode 100644 index 0000000000..02e0d110a0 --- /dev/null +++ b/ext/srt/Makefile.am @@ -0,0 +1,40 @@ +plugin_LTLIBRARIES = libgstsrt.la + +libgstsrt_la_SOURCES = \ + gstsrt.c \ + gstsrtbasesrc.c \ + gstsrtclientsrc.c \ + gstsrtserversrc.c \ + gstsrtbasesink.c \ + gstsrtclientsink.c \ + gstsrtserversink.c \ + $(NULL) + +libgstsrt_la_CFLAGS = \ + $(GST_PLUGINS_BASE_CFLAGS) \ + $(GST_CFLAGS) \ + $(GIO_CFLAGS) \ + $(SRT_CFLAGS) \ + $(NULL) + +libgstsrt_la_LIBADD = \ + $(GST_PLUGINS_BASE_LIBS) \ + $(GST_LIBS) \ + $(GIO_LIBS) \ + -lgstbase-1.0 \ + $(SRT_LIBS) \ + $(NULL) + +libgstsrt_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) + +CLEANFILES = $(BUILT_SOURCES) + +noinst_HEADERS = \ + gstsrtbasesink.h \ + gstsrtclientsink.h \ + gstsrtserversrc.h \ + gstsrtbasesrc.h \ + gstsrtclientsrc.h \ + gstsrtserversink.h + +include $(top_srcdir)/common/gst-glib-gen.mak diff --git a/ext/srt/gstsrt.c b/ext/srt/gstsrt.c new file mode 100644 index 0000000000..4afe16920f --- /dev/null +++ b/ext/srt/gstsrt.c @@ -0,0 +1,184 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrt.h" + +#include "gstsrtclientsrc.h" +#include "gstsrtserversrc.h" +#include "gstsrtclientsink.h" +#include "gstsrtserversink.h" + +#define GST_CAT_DEFAULT gst_debug_srt +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +SRTSOCKET +gst_srt_client_connect (GstElement * elem, int sender, + const gchar * host, guint16 port, int rendez_vous, + const gchar * bind_address, guint16 bind_port, int latency, + GSocketAddress ** socket_address, gint * poll_id) +{ + SRTSOCKET sock; + GError *error = NULL; + gpointer sa; + size_t sa_len; + + *socket_address = g_inet_socket_address_new_from_string (host, port); + + if (*socket_address == NULL) { + GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid host"), + ("Failed to parse host")); + goto failed; + } + + sa_len = g_socket_address_get_native_size (*socket_address); + sa = g_alloca (sa_len); + if (!g_socket_address_to_native (*socket_address, sa, sa_len, &error)) { + GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid address"), + ("cannot resolve address (reason: %s)", error->message)); + goto failed; + } + + sock = srt_socket (g_socket_address_get_family (*socket_address), SOCK_DGRAM, + 0); + if (sock == SRT_ERROR) { + GST_ELEMENT_ERROR (elem, LIBRARY, INIT, (NULL), + ("failed to create SRT socket (reason: %s)", srt_getlasterror_str ())); + goto failed; + } + + /* Make sure TSBPD mode is enable (SRT mode) */ + srt_setsockopt (sock, 0, SRTO_TSBPDMODE, &(int) { + 1}, sizeof (int)); + + /* This is a sink, we're always a receiver */ + srt_setsockopt (sock, 0, SRTO_SENDER, &sender, sizeof (int)); + + srt_setsockopt (sock, 0, SRTO_TSBPDDELAY, &latency, sizeof (int)); + + srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &rendez_vous, sizeof (int)); + + if (bind_address || bind_port || rendez_vous) { + gpointer bsa; + size_t bsa_len; + GSocketAddress *b_socket_address = NULL; + + if (bind_address == NULL) + bind_address = "0.0.0.0"; + + if (rendez_vous) + bind_port = port; + + b_socket_address = g_inet_socket_address_new_from_string (bind_address, + bind_port); + + if (b_socket_address == NULL) { + GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid bind address"), + ("Failed to parse bind address: %s:%d", bind_address, bind_port)); + goto failed; + } + + bsa_len = g_socket_address_get_native_size (b_socket_address); + bsa = g_alloca (bsa_len); + if (!g_socket_address_to_native (b_socket_address, bsa, bsa_len, &error)) { + GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Invalid bind address"), + ("Can't parse bind address to sockaddr: %s", error->message)); + g_clear_object (&b_socket_address); + goto failed; + } + g_clear_object (&b_socket_address); + + if (srt_bind (sock, bsa, bsa_len) == SRT_ERROR) { + GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, + ("Can't bind to address"), + ("Can't bind to %s:%d (reason: %s)", bind_address, bind_port, + srt_getlasterror_str ())); + goto failed; + } + } + + *poll_id = srt_epoll_create (); + if (*poll_id == -1) { + GST_ELEMENT_ERROR (elem, LIBRARY, INIT, (NULL), + ("failed to create poll id for SRT socket (reason: %s)", + srt_getlasterror_str ())); + goto failed; + } + + srt_epoll_add_usock (*poll_id, sock, &(int) { + SRT_EPOLL_OUT}); + + if (srt_connect (sock, sa, sa_len) == SRT_ERROR) { + GST_ELEMENT_ERROR (elem, RESOURCE, OPEN_READ, ("Connection error"), + ("failed to connect to host (reason: %s)", srt_getlasterror_str ())); + goto failed; + } + + return sock; + +failed: + if (*poll_id != SRT_ERROR) { + srt_epoll_release (*poll_id); + *poll_id = SRT_ERROR; + } + + if (sock != SRT_INVALID_SOCK) { + srt_close (sock); + sock = SRT_INVALID_SOCK; + } + + g_clear_error (&error); + g_clear_object (socket_address); + + return SRT_INVALID_SOCK; +} + +static gboolean +plugin_init (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srt", 0, "SRT Common code"); + + if (!gst_element_register (plugin, "srtclientsrc", GST_RANK_PRIMARY, + GST_TYPE_SRT_CLIENT_SRC)) + return FALSE; + + if (!gst_element_register (plugin, "srtserversrc", GST_RANK_PRIMARY, + GST_TYPE_SRT_SERVER_SRC)) + return FALSE; + + if (!gst_element_register (plugin, "srtclientsink", GST_RANK_PRIMARY, + GST_TYPE_SRT_CLIENT_SINK)) + return FALSE; + + if (!gst_element_register (plugin, "srtserversink", GST_RANK_PRIMARY, + GST_TYPE_SRT_SERVER_SINK)) + return FALSE; + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + srt, + "transfer data via SRT", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN); diff --git a/ext/srt/gstsrt.h b/ext/srt/gstsrt.h new file mode 100644 index 0000000000..7a7fec779c --- /dev/null +++ b/ext/srt/gstsrt.h @@ -0,0 +1,46 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author: Olivier Crete + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_H__ +#define __GST_SRT_H__ + +#include +#include + +#include + +#define SRT_URI_SCHEME "srt" +#define SRT_DEFAULT_PORT 7001 +#define SRT_DEFAULT_HOST "127.0.0.1" +#define SRT_DEFAULT_URI SRT_URI_SCHEME"://"SRT_DEFAULT_HOST":"G_STRINGIFY(SRT_DEFAULT_PORT) +#define SRT_DEFAULT_LATENCY 125 + +G_BEGIN_DECLS + +SRTSOCKET +gst_srt_client_connect (GstElement * elem, int sender, + const gchar * host, guint16 port, int rendez_vous, + const gchar * bind_address, guint16 bind_port, int latency, + GSocketAddress ** socket_address, gint * poll_id); + +G_END_DECLS + + +#endif /* __GST_SRT_H__ */ diff --git a/ext/srt/gstsrtbasesink.c b/ext/srt/gstsrtbasesink.c new file mode 100644 index 0000000000..a9fdc96c46 --- /dev/null +++ b/ext/srt/gstsrtbasesink.c @@ -0,0 +1,302 @@ +/* GStreamer SRT plugin based on libsrt + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrtserversink.h" +#include "gstsrt.h" +#include + +#include + +#define SRT_DEFAULT_POLL_TIMEOUT -1 + +#define GST_CAT_DEFAULT gst_debug_srt_base_sink +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +enum +{ + PROP_URI = 1, + PROP_LATENCY, + /*< private > */ + PROP_LAST +}; + +static GParamSpec *properties[PROP_LAST]; + +static void gst_srt_base_sink_uri_handler_init (gpointer g_iface, + gpointer iface_data); +static gchar *gst_srt_base_sink_uri_get_uri (GstURIHandler * handler); +static gboolean gst_srt_base_sink_uri_set_uri (GstURIHandler * handler, + const gchar * uri, GError ** error); + +#define gst_srt_base_sink_parent_class parent_class +G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstSRTBaseSink, gst_srt_base_sink, + GST_TYPE_BASE_SINK, + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, + gst_srt_base_sink_uri_handler_init) + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtbasesink", 0, + "SRT Base Sink")); + +static void +gst_srt_base_sink_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstSRTBaseSink *self = GST_SRT_BASE_SINK (object); + + switch (prop_id) { + case PROP_URI: + if (self->uri != NULL) { + gchar *uri_str = gst_srt_base_sink_uri_get_uri (GST_URI_HANDLER (self)); + g_value_take_string (value, uri_str); + } + break; + case PROP_LATENCY: + g_value_set_int (value, self->latency); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_base_sink_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstSRTBaseSink *self = GST_SRT_BASE_SINK (object); + + switch (prop_id) { + case PROP_URI: + gst_srt_base_sink_uri_set_uri (GST_URI_HANDLER (self), + g_value_get_string (value), NULL); + break; + case PROP_LATENCY: + self->latency = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_base_sink_finalize (GObject * object) +{ + GstSRTBaseSink *self = GST_SRT_BASE_SINK (object); + + g_clear_pointer (&self->uri, gst_uri_unref); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstFlowReturn +gst_srt_base_sink_render (GstBaseSink * sink, GstBuffer * buffer) +{ + GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink); + GstMapInfo info; + GstSRTBaseSinkClass *bclass = GST_SRT_BASE_SINK_GET_CLASS (sink); + GstFlowReturn ret = GST_FLOW_OK; + + GST_TRACE_OBJECT (self, "sending buffer %p, offset %" + G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT + ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT + ", size %" G_GSIZE_FORMAT, + buffer, GST_BUFFER_OFFSET (buffer), + GST_BUFFER_OFFSET_END (buffer), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)), + gst_buffer_get_size (buffer)); + + if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) { + GST_ELEMENT_ERROR (self, RESOURCE, READ, + ("Could not map the input stream"), (NULL)); + return GST_FLOW_ERROR; + } + + if (!bclass->send_buffer (self, &info)) + ret = GST_FLOW_ERROR; + + gst_buffer_unmap (buffer, &info); + + return ret; +} + +static void +gst_srt_base_sink_class_init (GstSRTBaseSinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); + + gobject_class->set_property = gst_srt_base_sink_set_property; + gobject_class->get_property = gst_srt_base_sink_get_property; + gobject_class->finalize = gst_srt_base_sink_finalize; + + /** + * GstSRTBaseSink:uri: + * + * The URI used by SRT Connection. + */ + properties[PROP_URI] = g_param_spec_string ("uri", "URI", + "URI in the form of srt://address:port", SRT_DEFAULT_URI, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_LATENCY] = + g_param_spec_int ("latency", "latency", + "Minimum latency (milliseconds)", 0, + G_MAXINT32, SRT_DEFAULT_LATENCY, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, PROP_LAST, properties); + + gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_srt_base_sink_render); +} + +static void +gst_srt_base_sink_init (GstSRTBaseSink * self) +{ + self->uri = gst_uri_from_string (SRT_DEFAULT_URI); + self->queued_buffers = NULL; + self->latency = SRT_DEFAULT_LATENCY; +} + +static GstURIType +gst_srt_base_sink_uri_get_type (GType type) +{ + return GST_URI_SINK; +} + +static const gchar *const * +gst_srt_base_sink_uri_get_protocols (GType type) +{ + static const gchar *protocols[] = { SRT_URI_SCHEME, NULL }; + + return protocols; +} + +static gchar * +gst_srt_base_sink_uri_get_uri (GstURIHandler * handler) +{ + gchar *uri_str; + GstSRTBaseSink *self = GST_SRT_BASE_SINK (handler); + + GST_OBJECT_LOCK (self); + uri_str = gst_uri_to_string (self->uri); + GST_OBJECT_UNLOCK (self); + + return uri_str; +} + +static gboolean +gst_srt_base_sink_uri_set_uri (GstURIHandler * handler, + const gchar * uri, GError ** error) +{ + GstSRTBaseSink *self = GST_SRT_BASE_SINK (handler); + gboolean ret = TRUE; + GstUri *parsed_uri = gst_uri_from_string (uri); + + GST_TRACE_OBJECT (self, "Requested URI=%s", uri); + + if (g_strcmp0 (gst_uri_get_scheme (parsed_uri), SRT_URI_SCHEME) != 0) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "Invalid SRT URI scheme"); + ret = FALSE; + goto out; + } + + GST_OBJECT_LOCK (self); + + g_clear_pointer (&self->uri, gst_uri_unref); + self->uri = gst_uri_ref (parsed_uri); + + GST_OBJECT_UNLOCK (self); + +out: + g_clear_pointer (&parsed_uri, gst_uri_unref); + return ret; +} + +static void +gst_srt_base_sink_uri_handler_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_srt_base_sink_uri_get_type; + iface->get_protocols = gst_srt_base_sink_uri_get_protocols; + iface->get_uri = gst_srt_base_sink_uri_get_uri; + iface->set_uri = gst_srt_base_sink_uri_set_uri; +} + +GstStructure * +gst_srt_base_sink_get_stats (GSocketAddress * sockaddr, SRTSOCKET sock) +{ + SRT_TRACEBSTATS stats; + int ret; + GValue v = G_VALUE_INIT; + GstStructure *s; + + if (sock == SRT_INVALID_SOCK || sockaddr == NULL) + return gst_structure_new_empty ("application/x-srt-statistics"); + + s = gst_structure_new ("application/x-srt-statistics", + "sockaddr", G_TYPE_SOCKET_ADDRESS, sockaddr, NULL); + + ret = srt_bstats (sock, &stats, 0); + if (ret >= 0) { + gst_structure_set (s, + /* number of sent data packets, including retransmissions */ + "packets-sent", G_TYPE_INT64, stats.pktSent, + /* number of lost packets (sender side) */ + "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss, + /* number of retransmitted packets */ + "packets-retransmitted", G_TYPE_INT, stats.pktRetrans, + /* number of received ACK packets */ + "packet-ack-received", G_TYPE_INT, stats.pktRecvACK, + /* number of received NAK packets */ + "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK, + /* time duration when UDT is sending data (idle time exclusive) */ + "send-duration-us", G_TYPE_INT64, stats.usSndDuration, + /* number of sent data bytes, including retransmissions */ + "bytes-sent", G_TYPE_UINT64, stats.byteSent, + /* number of retransmitted bytes */ + "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans, + /* number of too-late-to-send dropped bytes */ + "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop, + /* number of too-late-to-send dropped packets */ + "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop, + /* sending rate in Mb/s */ + "send-rate-mbps", G_TYPE_DOUBLE, stats.msRTT, + /* estimated bandwidth, in Mb/s */ + "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth, + /* busy sending time (i.e., idle time exclusive) */ + "send-duration-us", G_TYPE_UINT64, stats.usSndDuration, + "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, + "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL); + } + + g_value_init (&v, G_TYPE_STRING); + g_value_take_string (&v, + g_socket_connectable_to_string (G_SOCKET_CONNECTABLE (sockaddr))); + gst_structure_take_value (s, "sockaddr-str", &v); + + return s; +} diff --git a/ext/srt/gstsrtbasesink.h b/ext/srt/gstsrtbasesink.h new file mode 100644 index 0000000000..6e0b918353 --- /dev/null +++ b/ext/srt/gstsrtbasesink.h @@ -0,0 +1,73 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_BASE_SINK_H__ +#define __GST_SRT_BASE_SINK_H__ + +#include +#include +#include + +#include + +G_BEGIN_DECLS + +#define GST_TYPE_SRT_BASE_SINK (gst_srt_base_sink_get_type ()) +#define GST_IS_SRT_BASE_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_BASE_SINK)) +#define GST_IS_SRT_BASE_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_BASE_SINK)) +#define GST_SRT_BASE_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSinkClass)) +#define GST_SRT_BASE_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSink)) +#define GST_SRT_BASE_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_BASE_SINK, GstSRTBaseSinkClass)) +#define GST_SRT_BASE_SINK_CAST(obj) ((GstSRTBaseSink*)(obj)) +#define GST_SRT_BASE_SINK_CLASS_CAST(klass) ((GstSRTBaseSinkClass*)(klass)) + +typedef struct _GstSRTBaseSink GstSRTBaseSink; +typedef struct _GstSRTBaseSinkClass GstSRTBaseSinkClass; + +struct _GstSRTBaseSink { + GstBaseSink parent; + + GstUri *uri; + GList *queued_buffers; + gint latency; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstSRTBaseSinkClass { + GstBaseSinkClass parent_class; + + /* ask the subclass to send a buffer */ + gboolean (*send_buffer) (GstSRTBaseSink *self, const GstMapInfo *mapinfo); + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_srt_base_sink_get_type (void); + +GstStructure * gst_srt_base_sink_get_stats (GSocketAddress *sockaddr, + SRTSOCKET sock); + + +G_END_DECLS + +#endif /* __GST_SRT_BASE_SINK_H__ */ diff --git a/ext/srt/gstsrtbasesrc.c b/ext/srt/gstsrtbasesrc.c new file mode 100644 index 0000000000..a34ae38520 --- /dev/null +++ b/ext/srt/gstsrtbasesrc.c @@ -0,0 +1,262 @@ +/* GStreamer SRT plugin based on libsrt + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrtbasesrc.h" +#include "gstsrt.h" +#include +#include + +#include + +#define GST_CAT_DEFAULT gst_debug_srt_base_src +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +enum +{ + PROP_URI = 1, + PROP_CAPS, + PROP_LATENCY, + + /*< private > */ + PROP_LAST +}; + +static GParamSpec *properties[PROP_LAST]; + +static void gst_srt_base_src_uri_handler_init (gpointer g_iface, + gpointer iface_data); +static gchar *gst_srt_base_src_uri_get_uri (GstURIHandler * handler); +static gboolean gst_srt_base_src_uri_set_uri (GstURIHandler * handler, + const gchar * uri, GError ** error); + +#define gst_srt_base_src_parent_class parent_class +G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstSRTBaseSrc, gst_srt_base_src, + GST_TYPE_PUSH_SRC, G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, + gst_srt_base_src_uri_handler_init) + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtbasesrc", 0, + "SRT Base Source")); + +static void +gst_srt_base_src_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object); + + switch (prop_id) { + case PROP_URI: + if (self->uri != NULL) { + gchar *uri_str = gst_srt_base_src_uri_get_uri (GST_URI_HANDLER (self)); + g_value_take_string (value, uri_str); + } + break; + case PROP_CAPS: + GST_OBJECT_LOCK (self); + gst_value_set_caps (value, self->caps); + GST_OBJECT_UNLOCK (self); + break; + case PROP_LATENCY: + g_value_set_int (value, self->latency); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_base_src_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object); + + switch (prop_id) { + case PROP_URI: + gst_srt_base_src_uri_set_uri (GST_URI_HANDLER (self), + g_value_get_string (value), NULL); + break; + case PROP_CAPS: + GST_OBJECT_LOCK (self); + g_clear_pointer (&self->caps, gst_caps_unref); + self->caps = gst_caps_copy (gst_value_get_caps (value)); + GST_OBJECT_UNLOCK (self); + break; + case PROP_LATENCY: + self->latency = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_base_src_finalize (GObject * object) +{ + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object); + + g_clear_pointer (&self->uri, gst_uri_unref); + g_clear_pointer (&self->caps, gst_caps_unref); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstCaps * +gst_srt_base_src_get_caps (GstBaseSrc * src, GstCaps * filter) +{ + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (src); + GstCaps *result, *caps = NULL; + + GST_OBJECT_LOCK (self); + if (self->caps != NULL) { + caps = gst_caps_ref (self->caps); + } + GST_OBJECT_UNLOCK (self); + + if (caps) { + if (filter) { + result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST); + gst_caps_unref (caps); + } else { + result = caps; + } + } else { + result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any (); + } + + return result; +} + + +static void +gst_srt_base_src_class_init (GstSRTBaseSrcClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass); + + gobject_class->set_property = gst_srt_base_src_set_property; + gobject_class->get_property = gst_srt_base_src_get_property; + gobject_class->finalize = gst_srt_base_src_finalize; + + /** + * GstSRTBaseSrc:uri: + * + * The URI used by SRT Connection. + */ + properties[PROP_URI] = g_param_spec_string ("uri", "URI", + "URI in the form of srt://address:port", SRT_DEFAULT_URI, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + /** + * GstSRTBaseSrc:caps: + * + * The Caps used by the source pad. + */ + properties[PROP_CAPS] = + g_param_spec_boxed ("caps", "Caps", "The caps of the source pad", + GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_LATENCY] = + g_param_spec_int ("latency", "latency", + "Minimum latency (milliseconds)", 0, + G_MAXINT32, SRT_DEFAULT_LATENCY, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, PROP_LAST, properties); + + gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_srt_base_src_get_caps); +} + +static void +gst_srt_base_src_init (GstSRTBaseSrc * self) +{ + gst_srt_base_src_uri_set_uri (GST_URI_HANDLER (self), SRT_DEFAULT_URI, NULL); + gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME); + gst_base_src_set_live (GST_BASE_SRC (self), TRUE); + self->latency = SRT_DEFAULT_LATENCY; +} + +static GstURIType +gst_srt_base_src_uri_get_type (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +gst_srt_base_src_uri_get_protocols (GType type) +{ + static const gchar *protocols[] = { SRT_URI_SCHEME, NULL }; + + return protocols; +} + +static gchar * +gst_srt_base_src_uri_get_uri (GstURIHandler * handler) +{ + gchar *uri_str; + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (handler); + + GST_OBJECT_LOCK (self); + uri_str = gst_uri_to_string (self->uri); + GST_OBJECT_UNLOCK (self); + + return uri_str; +} + +static gboolean +gst_srt_base_src_uri_set_uri (GstURIHandler * handler, + const gchar * uri, GError ** error) +{ + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (handler); + gboolean ret = TRUE; + GstUri *parsed_uri = gst_uri_from_string (uri); + + if (g_strcmp0 (gst_uri_get_scheme (parsed_uri), SRT_URI_SCHEME) != 0) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "Invalid SRT URI scheme"); + ret = FALSE; + goto out; + } + + GST_OBJECT_LOCK (self); + + g_clear_pointer (&self->uri, gst_uri_unref); + self->uri = gst_uri_ref (parsed_uri); + + GST_OBJECT_UNLOCK (self); + +out: + g_clear_pointer (&parsed_uri, gst_uri_unref); + return ret; +} + +static void +gst_srt_base_src_uri_handler_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_srt_base_src_uri_get_type; + iface->get_protocols = gst_srt_base_src_uri_get_protocols; + iface->get_uri = gst_srt_base_src_uri_get_uri; + iface->set_uri = gst_srt_base_src_uri_set_uri; +} diff --git a/ext/srt/gstsrtbasesrc.h b/ext/srt/gstsrtbasesrc.h new file mode 100644 index 0000000000..bf7414ee8b --- /dev/null +++ b/ext/srt/gstsrtbasesrc.h @@ -0,0 +1,63 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_BASE_SRC_H__ +#define __GST_SRT_BASE_SRC_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_SRT_BASE_SRC (gst_srt_base_src_get_type ()) +#define GST_IS_SRT_BASE_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_BASE_SRC)) +#define GST_IS_SRT_BASE_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_BASE_SRC)) +#define GST_SRT_BASE_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrcClass)) +#define GST_SRT_BASE_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrc)) +#define GST_SRT_BASE_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_BASE_SRC, GstSRTBaseSrcClass)) +#define GST_SRT_BASE_SRC_CAST(obj) ((GstSRTBaseSrc*)(obj)) +#define GST_SRT_BASE_SRC_CLASS_CAST(klass) ((GstSRTBaseSrcClass*)(klass)) + +typedef struct _GstSRTBaseSrc GstSRTBaseSrc; +typedef struct _GstSRTBaseSrcClass GstSRTBaseSrcClass; + +struct _GstSRTBaseSrc { + GstPushSrc parent; + + GstUri *uri; + GstCaps *caps; + gint latency; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstSRTBaseSrcClass { + GstPushSrcClass parent_class; + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_srt_base_src_get_type (void); + +G_END_DECLS + +#endif /* __GST_SRT_BASE_SRC_H__ */ diff --git a/ext/srt/gstsrtclientsink.c b/ext/srt/gstsrtclientsink.c new file mode 100644 index 0000000000..7371e08f09 --- /dev/null +++ b/ext/srt/gstsrtclientsink.c @@ -0,0 +1,268 @@ +/* GStreamer SRT plugin based on libsrt + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-srtserversink + * @title: srtserversink + * + * srtserversink is a network sink that sends SRT + * packets to the network. Although SRT is an UDP-based protocol, srtserversink works like + * a server socket of connection-oriented protocol. + * + * + * Examples + * |[ + * gst-launch-1.0 -v audiotestsrc ! srtserversink + * ]| This pipeline shows how to serve SRT packets through the default port. + + * |[ + * gst-launch-1.0 -v audiotestsrc ! srtserversink uri=srt://192.168.1.10:8888/ rendez-vous=1 + * ]| This pipeline shows how to serve SRT packets to 192.168.1.10 port 8888 using the rendez-vous mode. + * + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrtclientsink.h" +#include "gstsrt.h" +#include +#include + +#define SRT_DEFAULT_POLL_TIMEOUT -1 + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define GST_CAT_DEFAULT gst_debug_srt_client_sink +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +struct _GstSRTClientSinkPrivate +{ + SRTSOCKET sock; + GSocketAddress *sockaddr; + gint poll_id; + gint poll_timeout; + + gboolean rendez_vous; + gchar *bind_address; + guint16 bind_port; +}; + +#define GST_SRT_CLIENT_SINK_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkPrivate)) + +enum +{ + PROP_POLL_TIMEOUT = 1, + PROP_BIND_ADDRESS, + PROP_BIND_PORT, + PROP_RENDEZ_VOUS, + PROP_STATS, + /*< private > */ + PROP_LAST +}; + +static GParamSpec *properties[PROP_LAST]; + +#define gst_srt_client_sink_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstSRTClientSink, gst_srt_client_sink, + GST_TYPE_SRT_BASE_SINK, G_ADD_PRIVATE (GstSRTClientSink) + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtclientsink", 0, + "SRT Client Sink")); + +static void +gst_srt_client_sink_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstSRTClientSink *self = GST_SRT_CLIENT_SINK (object); + GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + g_value_set_int (value, priv->poll_timeout); + break; + case PROP_BIND_PORT: + g_value_set_int (value, priv->rendez_vous); + break; + case PROP_BIND_ADDRESS: + g_value_set_string (value, priv->bind_address); + break; + case PROP_RENDEZ_VOUS: + g_value_set_boolean (value, priv->bind_port); + break; + case PROP_STATS: + g_value_take_boxed (value, gst_srt_base_sink_get_stats (priv->sockaddr, + priv->sock)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_client_sink_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstSRTClientSink *self = GST_SRT_CLIENT_SINK (object); + GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + priv->poll_timeout = g_value_get_int (value); + break; + case PROP_BIND_ADDRESS: + g_free (priv->bind_address); + priv->bind_address = g_value_dup_string (value); + break; + case PROP_BIND_PORT: + priv->bind_port = g_value_get_int (value); + break; + case PROP_RENDEZ_VOUS: + priv->rendez_vous = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +gst_srt_client_sink_start (GstBaseSink * sink) +{ + GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink); + GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + GstSRTBaseSink *base = GST_SRT_BASE_SINK (sink); + GstUri *uri = gst_uri_ref (GST_SRT_BASE_SINK (self)->uri); + + priv->sock = gst_srt_client_connect (GST_ELEMENT (sink), FALSE, + gst_uri_get_host (uri), gst_uri_get_port (uri), priv->rendez_vous, + priv->bind_address, priv->bind_port, base->latency, + &priv->sockaddr, &priv->poll_id); + + g_clear_pointer (&uri, gst_uri_unref); + + return (priv->sock != SRT_INVALID_SOCK); +} + +static gboolean +gst_srt_client_sink_send_buffer (GstSRTBaseSink * sink, + const GstMapInfo * mapinfo) +{ + GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink); + GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + + if (srt_sendmsg2 (priv->sock, (char *) mapinfo->data, mapinfo->size, + 0) == SRT_ERROR) { + GST_ELEMENT_ERROR (self, RESOURCE, WRITE, NULL, + ("%s", srt_getlasterror_str ())); + return FALSE; + } + + return TRUE; +} + +static gboolean +gst_srt_client_sink_stop (GstBaseSink * sink) +{ + GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink); + GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + + GST_DEBUG_OBJECT (self, "closing SRT connection"); + + if (priv->poll_id != SRT_ERROR) { + srt_epoll_remove_usock (priv->poll_id, priv->sock); + srt_epoll_release (priv->poll_id); + priv->poll_id = SRT_ERROR; + } + + if (priv->sock != SRT_INVALID_SOCK) { + srt_close (priv->sock); + priv->sock = SRT_INVALID_SOCK; + } + + g_clear_object (&priv->sockaddr); + + return TRUE; +} + +static void +gst_srt_client_sink_class_init (GstSRTClientSinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); + GstSRTBaseSinkClass *gstsrtbasesink_class = GST_SRT_BASE_SINK_CLASS (klass); + + gobject_class->set_property = gst_srt_client_sink_set_property; + gobject_class->get_property = gst_srt_client_sink_get_property; + + properties[PROP_POLL_TIMEOUT] = + g_param_spec_int ("poll-timeout", "Poll Timeout", + "Return poll wait after timeout miliseconds (-1 = infinite)", -1, + G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_BIND_ADDRESS] = + g_param_spec_string ("bind-address", "Bind Address", + "Address to bind socket to (required for rendez-vous mode) ", NULL, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + properties[PROP_BIND_PORT] = + g_param_spec_int ("bind-port", "Bind Port", + "Port to bind socket to (Ignored in rendez-vous mode)", 0, + G_MAXUINT16, 0, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + properties[PROP_RENDEZ_VOUS] = + g_param_spec_boolean ("rendez-vous", "Rendez Vous", + "Work in Rendez-Vous mode instead of client/caller mode", FALSE, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + properties[PROP_STATS] = g_param_spec_boxed ("stats", "Statistics", + "SRT Statistics", GST_TYPE_STRUCTURE, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, PROP_LAST, properties); + + gst_element_class_add_static_pad_template (gstelement_class, &sink_template); + gst_element_class_set_metadata (gstelement_class, + "SRT client sink", "Sink/Network", + "Send data over the network via SRT", + "Justin Kim "); + + gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_srt_client_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_client_sink_stop); + + gstsrtbasesink_class->send_buffer = + GST_DEBUG_FUNCPTR (gst_srt_client_sink_send_buffer); +} + +static void +gst_srt_client_sink_init (GstSRTClientSink * self) +{ + GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self); + priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT; +} diff --git a/ext/srt/gstsrtclientsink.h b/ext/srt/gstsrtclientsink.h new file mode 100644 index 0000000000..e910050252 --- /dev/null +++ b/ext/srt/gstsrtclientsink.h @@ -0,0 +1,59 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_CLIENT_SINK_H__ +#define __GST_SRT_CLIENT_SINK_H__ + +#include "gstsrtbasesink.h" + +G_BEGIN_DECLS + +#define GST_TYPE_SRT_CLIENT_SINK (gst_srt_client_sink_get_type ()) +#define GST_IS_SRT_CLIENT_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_CLIENT_SINK)) +#define GST_IS_SRT_CLIENT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_CLIENT_SINK)) +#define GST_SRT_CLIENT_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkClass)) +#define GST_SRT_CLIENT_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSink)) +#define GST_SRT_CLIENT_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_CLIENT_SINK, GstSRTClientSinkClass)) +#define GST_SRT_CLIENT_SINK_CAST(obj) ((GstSRTClientSink*)(obj)) +#define GST_SRT_CLIENT_SINK_CLASS_CAST(klass) ((GstSRTClientSinkClass*)(klass)) + +typedef struct _GstSRTClientSink GstSRTClientSink; +typedef struct _GstSRTClientSinkClass GstSRTClientSinkClass; +typedef struct _GstSRTClientSinkPrivate GstSRTClientSinkPrivate; + +struct _GstSRTClientSink { + GstSRTBaseSink parent; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstSRTClientSinkClass { + GstSRTBaseSinkClass parent_class; + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_srt_client_sink_get_type (void); + +G_END_DECLS + +#endif /* __GST_SRT_CLIENT_SINK_H__ */ diff --git a/ext/srt/gstsrtclientsrc.c b/ext/srt/gstsrtclientsrc.c new file mode 100644 index 0000000000..2ad35b98ee --- /dev/null +++ b/ext/srt/gstsrtclientsrc.c @@ -0,0 +1,339 @@ +/* GStreamer SRT plugin based on libsrt + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-srtclientsrc + * @title: srtclientsrc + * + * srtclientsrc is a network source that reads SRT + * packets from the network. Although SRT is a protocol based on UDP, srtclientsrc works like + * a client socket of connection-oriented protocol. + * + * + * Examples + * |[ + * gst-launch-1.0 -v srtclientsrc uri="srt://127.0.0.1:7001" ! fakesink + * ]| This pipeline shows how to connect SRT server by setting #GstSRTClientSrc:uri property. + * + * |[ + * gst-launch-1.0 -v srtclientsrc uri="srt://192.168.1.10:7001" rendez-vous ! fakesink + * ]| This pipeline shows how to connect SRT server by setting #GstSRTClientSrc:uri property and using the rendez-vous mode. + * + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrtclientsrc.h" +#include +#include + +#include "gstsrt.h" + +#include + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define GST_CAT_DEFAULT gst_debug_srt_client_src +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +struct _GstSRTClientSrcPrivate +{ + SRTSOCKET sock; + gint poll_id; + gint poll_timeout; + + gboolean rendez_vous; + gchar *bind_address; + guint16 bind_port; +}; + +#define GST_SRT_CLIENT_SRC_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcPrivate)) + +#define SRT_DEFAULT_POLL_TIMEOUT -1 +enum +{ + PROP_POLL_TIMEOUT = 1, + PROP_BIND_ADDRESS, + PROP_BIND_PORT, + PROP_RENDEZ_VOUS, + + /*< private > */ + PROP_LAST +}; + +static GParamSpec *properties[PROP_LAST + 1]; + +#define gst_srt_client_src_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstSRTClientSrc, gst_srt_client_src, + GST_TYPE_SRT_BASE_SRC, G_ADD_PRIVATE (GstSRTClientSrc) + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtclientsrc", 0, + "SRT Client Source")); + +static void +gst_srt_client_src_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (object); + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + g_value_set_int (value, priv->poll_timeout); + break; + case PROP_BIND_PORT: + g_value_set_int (value, priv->rendez_vous); + break; + case PROP_BIND_ADDRESS: + g_value_set_string (value, priv->bind_address); + break; + case PROP_RENDEZ_VOUS: + g_value_set_boolean (value, priv->bind_port); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_client_src_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstSRTBaseSrc *self = GST_SRT_BASE_SRC (object); + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + priv->poll_timeout = g_value_get_int (value); + break; + case PROP_BIND_ADDRESS: + g_free (priv->bind_address); + priv->bind_address = g_value_dup_string (value); + break; + case PROP_BIND_PORT: + priv->bind_port = g_value_get_int (value); + break; + case PROP_RENDEZ_VOUS: + priv->rendez_vous = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_client_src_finalize (GObject * object) +{ + GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (object); + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + + if (priv->poll_id != SRT_ERROR) { + srt_epoll_release (priv->poll_id); + priv->poll_id = SRT_ERROR; + } + + if (priv->sock != SRT_INVALID_SOCK) { + srt_close (priv->sock); + priv->sock = SRT_INVALID_SOCK; + } + + g_free (priv->bind_address); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstFlowReturn +gst_srt_client_src_fill (GstPushSrc * src, GstBuffer * outbuf) +{ + GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src); + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + GstFlowReturn ret = GST_FLOW_OK; + GstMapInfo info; + SRTSOCKET ready[2]; + gint recv_len; + + if (srt_epoll_wait (priv->poll_id, 0, 0, ready, &(int) { + 2}, priv->poll_timeout, 0, 0, 0, 0) == -1) { + + /* Assuming that timeout error is normal */ + if (srt_getlasterror (NULL) != SRT_ETIMEOUT) { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + (NULL), ("srt_epoll_wait error: %s", srt_getlasterror_str ())); + ret = GST_FLOW_ERROR; + } + srt_clearlasterror (); + goto out; + } + + if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + ("Could not map the buffer for writing "), (NULL)); + ret = GST_FLOW_ERROR; + goto out; + } + + recv_len = srt_recvmsg (priv->sock, (char *) info.data, + gst_buffer_get_size (outbuf)); + + gst_buffer_unmap (outbuf, &info); + + if (recv_len == SRT_ERROR) { + GST_ELEMENT_ERROR (src, RESOURCE, READ, + (NULL), ("srt_recvmsg error: %s", srt_getlasterror_str ())); + ret = GST_FLOW_ERROR; + goto out; + } else if (recv_len == 0) { + ret = GST_FLOW_EOS; + goto out; + } + + GST_BUFFER_PTS (outbuf) = + gst_clock_get_time (GST_ELEMENT_CLOCK (src)) - + GST_ELEMENT_CAST (src)->base_time; + + gst_buffer_resize (outbuf, 0, recv_len); + + GST_LOG_OBJECT (src, + "filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %" + GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT + ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, + gst_buffer_get_size (outbuf), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)), + GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf)); + +out: + return ret; +} + +static gboolean +gst_srt_client_src_start (GstBaseSrc * src) +{ + GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src); + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + GstSRTBaseSrc *base = GST_SRT_BASE_SRC (src); + GstUri *uri = gst_uri_ref (base->uri); + GSocketAddress *socket_address = NULL; + + priv->sock = gst_srt_client_connect (GST_ELEMENT (src), FALSE, + gst_uri_get_host (uri), gst_uri_get_port (uri), priv->rendez_vous, + priv->bind_address, priv->bind_port, base->latency, + &socket_address, &priv->poll_id); + + g_clear_object (&socket_address); + g_clear_pointer (&uri, gst_uri_unref); + + return (priv->sock != SRT_INVALID_SOCK); +} + +static gboolean +gst_srt_client_src_stop (GstBaseSrc * src) +{ + GstSRTClientSrc *self = GST_SRT_CLIENT_SRC (src); + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + + if (priv->poll_id != SRT_ERROR) { + if (priv->sock != SRT_INVALID_SOCK) + srt_epoll_remove_usock (priv->poll_id, priv->sock); + srt_epoll_release (priv->poll_id); + } + priv->poll_id = SRT_ERROR; + + GST_DEBUG_OBJECT (self, "closing SRT connection"); + if (priv->sock != SRT_INVALID_SOCK) + srt_close (priv->sock); + priv->sock = SRT_INVALID_SOCK; + + return TRUE; +} + +static void +gst_srt_client_src_class_init (GstSRTClientSrcClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass); + GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass); + + gobject_class->set_property = gst_srt_client_src_set_property; + gobject_class->get_property = gst_srt_client_src_get_property; + gobject_class->finalize = gst_srt_client_src_finalize; + + /** + * GstSRTClientSrc:poll-timeout: + * + * The timeout(ms) value when polling SRT socket. + */ + properties[PROP_POLL_TIMEOUT] = + g_param_spec_int ("poll-timeout", "Poll timeout", + "Return poll wait after timeout miliseconds (-1 = infinite)", -1, + G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + properties[PROP_BIND_ADDRESS] = + g_param_spec_string ("bind-address", "Bind Address", + "Address to bind socket to (required for rendez-vous mode) ", NULL, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + properties[PROP_BIND_PORT] = + g_param_spec_int ("bind-port", "Bind Port", + "Port to bind socket to (Ignored in rendez-vous mode)", 0, + G_MAXUINT16, 0, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + properties[PROP_RENDEZ_VOUS] = + g_param_spec_boolean ("rendez-vous", "Rendez Vous", + "Work in Rendez-Vous mode instead of client/caller mode", FALSE, + G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, PROP_LAST, properties); + + gst_element_class_add_static_pad_template (gstelement_class, &src_template); + gst_element_class_set_metadata (gstelement_class, + "SRT client source", "Source/Network", + "Receive data over the network via SRT", + "Justin Kim "); + + gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_client_src_start); + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_client_src_stop); + + gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_client_src_fill); +} + +static void +gst_srt_client_src_init (GstSRTClientSrc * self) +{ + GstSRTClientSrcPrivate *priv = GST_SRT_CLIENT_SRC_GET_PRIVATE (self); + + priv->sock = SRT_INVALID_SOCK; + priv->poll_id = SRT_ERROR; + priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT; + priv->rendez_vous = FALSE; + priv->bind_address = NULL; + priv->bind_port = 0; +} diff --git a/ext/srt/gstsrtclientsrc.h b/ext/srt/gstsrtclientsrc.h new file mode 100644 index 0000000000..b2003b6ace --- /dev/null +++ b/ext/srt/gstsrtclientsrc.h @@ -0,0 +1,59 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_CLIENT_SRC_H__ +#define __GST_SRT_CLIENT_SRC_H__ + +#include "gstsrtbasesrc.h" + +G_BEGIN_DECLS + +#define GST_TYPE_SRT_CLIENT_SRC (gst_srt_client_src_get_type ()) +#define GST_IS_SRT_CLIENT_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_CLIENT_SRC)) +#define GST_IS_SRT_CLIENT_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_CLIENT_SRC)) +#define GST_SRT_CLIENT_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcClass)) +#define GST_SRT_CLIENT_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrc)) +#define GST_SRT_CLIENT_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_CLIENT_SRC, GstSRTClientSrcClass)) +#define GST_SRT_CLIENT_SRC_CAST(obj) ((GstSRTClientSrc*)(obj)) +#define GST_SRT_CLIENT_SRC_CLASS_CAST(klass) ((GstSRTClientSrcClass*)(klass)) + +typedef struct _GstSRTClientSrc GstSRTClientSrc; +typedef struct _GstSRTClientSrcClass GstSRTClientSrcClass; +typedef struct _GstSRTClientSrcPrivate GstSRTClientSrcPrivate; + +struct _GstSRTClientSrc { + GstSRTBaseSrc parent; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstSRTClientSrcClass { + GstSRTBaseSrcClass parent_class; + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_srt_client_src_get_type (void); + +G_END_DECLS + +#endif /* __GST_SRT_CLIENT_SRC_H__ */ diff --git a/ext/srt/gstsrtserversink.c b/ext/srt/gstsrtserversink.c new file mode 100644 index 0000000000..a4095a016f --- /dev/null +++ b/ext/srt/gstsrtserversink.c @@ -0,0 +1,555 @@ +/* GStreamer SRT plugin based on libsrt + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-srtserversink + * @title: srtserversink + * + * srtserversink is a network sink that sends SRT + * packets to the network. Although SRT is an UDP-based protocol, srtserversink works like + * a server socket of connection-oriented protocol. + * + * + * Examples + * |[ + * gst-launch-1.0 -v audiotestsrc ! srtserversink + * ]| This pipeline shows how to serve SRT packets through the default port. + * + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrtserversink.h" +#include "gstsrt.h" +#include +#include + +#define SRT_DEFAULT_POLL_TIMEOUT -1 + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define GST_CAT_DEFAULT gst_debug_srt_server_sink +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +struct _GstSRTServerSinkPrivate +{ + gboolean cancelled; + + SRTSOCKET sock; + gint poll_id; + gint poll_timeout; + + GMainLoop *loop; + GMainContext *context; + GSource *server_source; + GThread *thread; + + GList *clients; +}; + +#define GST_SRT_SERVER_SINK_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkPrivate)) + +enum +{ + PROP_POLL_TIMEOUT = 1, + PROP_STATS, + /*< private > */ + PROP_LAST +}; + +static GParamSpec *properties[PROP_LAST]; + +enum +{ + SIG_CLIENT_ADDED, + SIG_CLIENT_REMOVED, + + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +#define gst_srt_server_sink_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstSRTServerSink, gst_srt_server_sink, + GST_TYPE_SRT_BASE_SINK, G_ADD_PRIVATE (GstSRTServerSink) + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtserversink", 0, + "SRT Server Sink")); + +typedef struct +{ + int sock; + GSocketAddress *sockaddr; +} SRTClient; + +static SRTClient * +srt_client_new (void) +{ + SRTClient *client = g_new0 (SRTClient, 1); + client->sock = SRT_INVALID_SOCK; + return client; +} + +static void +srt_client_free (SRTClient * client) +{ + g_return_if_fail (client != NULL); + + g_clear_object (&client->sockaddr); + + if (client->sock != SRT_INVALID_SOCK) { + srt_close (client->sock); + } + + g_free (client); +} + +static void +srt_emit_client_removed (SRTClient * client, gpointer user_data) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (user_data); + g_return_if_fail (client != NULL && GST_IS_SRT_SERVER_SINK (self)); + + g_signal_emit (self, signals[SIG_CLIENT_REMOVED], 0, client->sock, + client->sockaddr); +} + +static void +gst_srt_server_sink_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (object); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + g_value_set_int (value, priv->poll_timeout); + break; + case PROP_STATS: + { + GList *item; + + GST_OBJECT_LOCK (self); + for (item = priv->clients; item; item = item->next) { + SRTClient *client = item->data; + GValue tmp = G_VALUE_INIT; + + g_value_init (&tmp, GST_TYPE_STRUCTURE); + g_value_take_boxed (&tmp, gst_srt_base_sink_get_stats (client->sockaddr, + client->sock)); + gst_value_array_append_and_take_value (value, &tmp); + } + GST_OBJECT_UNLOCK (self); + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_server_sink_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (object); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + priv->poll_timeout = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +idle_listen_callback (gpointer data) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (data); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + gboolean ret = TRUE; + + SRTClient *client; + SRTSOCKET ready[2]; + struct sockaddr sa; + int sa_len; + + if (srt_epoll_wait (priv->poll_id, ready, &(int) { + 2}, 0, 0, priv->poll_timeout, 0, 0, 0, 0) == -1) { + int srt_errno = srt_getlasterror (NULL); + + if (srt_errno != SRT_ETIMEOUT) { + GST_ELEMENT_ERROR (self, RESOURCE, FAILED, + ("SRT error: %s", srt_getlasterror_str ()), (NULL)); + ret = FALSE; + goto out; + } + + /* Mimicking cancellable */ + if (srt_errno == SRT_ETIMEOUT && priv->cancelled) { + GST_DEBUG_OBJECT (self, "Cancelled waiting for client"); + ret = FALSE; + goto out; + } + } + + client = srt_client_new (); + client->sock = srt_accept (priv->sock, &sa, &sa_len); + + if (client->sock == SRT_INVALID_SOCK) { + GST_WARNING_OBJECT (self, "detected invalid SRT client socket (reason: %s)", + srt_getlasterror_str ()); + srt_clearlasterror (); + srt_client_free (client); + ret = FALSE; + goto out; + } + + client->sockaddr = g_socket_address_new_from_native (&sa, sa_len); + + GST_OBJECT_LOCK (self); + priv->clients = g_list_append (priv->clients, client); + GST_OBJECT_UNLOCK (self); + + g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0, client->sock, + client->sockaddr); + GST_DEBUG_OBJECT (self, "client added"); + +out: + return ret; +} + +static gpointer +thread_func (gpointer data) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (data); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + + g_main_loop_run (priv->loop); + + return NULL; +} + +static gboolean +gst_srt_server_sink_start (GstBaseSink * sink) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + GstSRTBaseSink *base = GST_SRT_BASE_SINK (sink); + GstUri *uri = gst_uri_ref (GST_SRT_BASE_SINK (self)->uri); + GSocketAddress *socket_address = NULL; + GError *error = NULL; + gboolean ret = TRUE; + struct sockaddr sa; + size_t sa_len; + const gchar *host; + int lat = base->latency; + + if (gst_uri_get_port (uri) == GST_URI_NO_PORT) { + GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, NULL, (("Invalid port"))); + return FALSE; + } + + host = gst_uri_get_host (uri); + if (host == NULL) { + GInetAddress *any = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4); + + socket_address = g_inet_socket_address_new (any, gst_uri_get_port (uri)); + g_object_unref (any); + } else { + socket_address = + g_inet_socket_address_new_from_string (host, gst_uri_get_port (uri)); + } + + if (socket_address == NULL) { + GST_WARNING_OBJECT (self, + "failed to extract host or port from the given URI"); + goto failed; + } + + sa_len = g_socket_address_get_native_size (socket_address); + if (!g_socket_address_to_native (socket_address, &sa, sa_len, &error)) { + GST_WARNING_OBJECT (self, "cannot resolve address (reason: %s)", + error->message); + goto failed; + } + + priv->sock = srt_socket (sa.sa_family, SOCK_DGRAM, 0); + if (priv->sock == SRT_INVALID_SOCK) { + GST_WARNING_OBJECT (self, "failed to create SRT socket (reason: %s)", + srt_getlasterror_str ()); + goto failed; + } + + /* Make SRT non-blocking */ + srt_setsockopt (priv->sock, 0, SRTO_SNDSYN, &(int) { + 0}, sizeof (int)); + + /* Make sure TSBPD mode is enable (SRT mode) */ + srt_setsockopt (priv->sock, 0, SRTO_TSBPDMODE, &(int) { + 1}, sizeof (int)); + + /* This is a sink, we're always a sender */ + srt_setsockopt (priv->sock, 0, SRTO_SENDER, &(int) { + 1}, sizeof (int)); + + srt_setsockopt (priv->sock, 0, SRTO_TSBPDDELAY, &lat, sizeof (int)); + + priv->poll_id = srt_epoll_create (); + if (priv->poll_id == -1) { + GST_WARNING_OBJECT (self, + "failed to create poll id for SRT socket (reason: %s)", + srt_getlasterror_str ()); + goto failed; + } + srt_epoll_add_usock (priv->poll_id, priv->sock, &(int) { + SRT_EPOLL_IN}); + + if (srt_bind (priv->sock, &sa, sa_len) == SRT_ERROR) { + GST_WARNING_OBJECT (self, "failed to bind SRT server socket (reason: %s)", + srt_getlasterror_str ()); + goto failed; + } + + if (srt_listen (priv->sock, 1) == SRT_ERROR) { + GST_WARNING_OBJECT (self, "failed to listen SRT socket (reason: %s)", + srt_getlasterror_str ()); + goto failed; + } + + priv->context = g_main_context_new (); + + priv->server_source = g_idle_source_new (); + g_source_set_callback (priv->server_source, + (GSourceFunc) idle_listen_callback, gst_object_ref (self), + (GDestroyNotify) gst_object_unref); + + g_source_attach (priv->server_source, priv->context); + priv->loop = g_main_loop_new (priv->context, TRUE); + + priv->thread = g_thread_try_new ("srtserversink", thread_func, self, &error); + if (error != NULL) { + GST_WARNING_OBJECT (self, "failed to create thread (reason: %s)", + error->message); + ret = FALSE; + } + + g_clear_pointer (&uri, gst_uri_unref); + g_clear_object (&socket_address); + + return ret; + +failed: + if (priv->poll_id != SRT_ERROR) { + srt_epoll_release (priv->poll_id); + priv->poll_id = SRT_ERROR; + } + + if (priv->sock != SRT_INVALID_SOCK) { + srt_close (priv->sock); + priv->sock = SRT_INVALID_SOCK; + } + + g_clear_error (&error); + g_clear_pointer (&uri, gst_uri_unref); + g_clear_object (&socket_address); + + return FALSE; +} + +static gboolean +gst_srt_server_sink_send_buffer (GstSRTBaseSink * sink, + const GstMapInfo * mapinfo) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + GList *clients = priv->clients; + + GST_OBJECT_LOCK (sink); + while (clients != NULL) { + SRTClient *client = clients->data; + clients = clients->next; + + if (srt_sendmsg2 (client->sock, (char *) mapinfo->data, mapinfo->size, + 0) == SRT_ERROR) { + GST_WARNING_OBJECT (self, "%s", srt_getlasterror_str ()); + + priv->clients = g_list_remove (priv->clients, client); + GST_OBJECT_UNLOCK (sink); + g_signal_emit (self, signals[SIG_CLIENT_REMOVED], 0, client->sock, + client->sockaddr); + srt_client_free (client); + GST_OBJECT_LOCK (sink); + } + } + GST_OBJECT_UNLOCK (sink); + + return TRUE; +} + +static gboolean +gst_srt_server_sink_stop (GstBaseSink * sink) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + gboolean ret = TRUE; + GList *clients; + + GST_DEBUG_OBJECT (self, "closing client sockets"); + + GST_OBJECT_LOCK (sink); + clients = priv->clients; + priv->clients = NULL; + GST_OBJECT_UNLOCK (sink); + + g_list_foreach (clients, (GFunc) srt_emit_client_removed, self); + g_list_free_full (clients, (GDestroyNotify) srt_client_free); + + GST_DEBUG_OBJECT (self, "closing SRT connection"); + srt_epoll_remove_usock (priv->poll_id, priv->sock); + srt_epoll_release (priv->poll_id); + srt_close (priv->sock); + + if (priv->loop) { + g_main_loop_quit (priv->loop); + g_thread_join (priv->thread); + g_clear_pointer (&priv->loop, g_main_loop_unref); + g_clear_pointer (&priv->thread, g_thread_unref); + } + + if (priv->server_source) { + g_source_destroy (priv->server_source); + g_clear_pointer (&priv->server_source, g_source_unref); + } + + g_clear_pointer (&priv->context, g_main_context_unref); + + return ret; +} + +static gboolean +gst_srt_server_sink_unlock (GstBaseSink * sink) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + + priv->cancelled = TRUE; + + return TRUE; +} + +static gboolean +gst_srt_server_sink_unlock_stop (GstBaseSink * sink) +{ + GstSRTServerSink *self = GST_SRT_SERVER_SINK (sink); + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + + priv->cancelled = FALSE; + + return TRUE; +} + +static void +gst_srt_server_sink_class_init (GstSRTServerSinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); + GstSRTBaseSinkClass *gstsrtbasesink_class = GST_SRT_BASE_SINK_CLASS (klass); + + gobject_class->set_property = gst_srt_server_sink_set_property; + gobject_class->get_property = gst_srt_server_sink_get_property; + + properties[PROP_POLL_TIMEOUT] = + g_param_spec_int ("poll-timeout", "Poll Timeout", + "Return poll wait after timeout miliseconds (-1 = infinite)", -1, + G_MAXINT32, SRT_DEFAULT_POLL_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + properties[PROP_STATS] = gst_param_spec_array ("stats", "Statistics", + "Array of GstStructures containing SRT statistics", + g_param_spec_boxed ("stats", "Statistics", + "Statistics for one client", GST_TYPE_STRUCTURE, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS), + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, PROP_LAST, properties); + + /** + * GstSRTServerSink::client-added: + * @gstsrtserversink: the srtserversink element that emitted this signal + * @sock: the client socket descriptor that was added to srtserversink + * @addr: the pointer of "struct sockaddr" that describes the @sock + * @addr_len: the length of @addr + * + * The given socket descriptor was added to srtserversink. + */ + signals[SIG_CLIENT_ADDED] = + g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSinkClass, client_added), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, + 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS); + + /** + * GstSRTServerSink::client-removed: + * @gstsrtserversink: the srtserversink element that emitted this signal + * @sock: the client socket descriptor that was added to srtserversink + * @addr: the pointer of "struct sockaddr" that describes the @sock + * @addr_len: the length of @addr + * + * The given socket descriptor was removed from srtserversink. + */ + signals[SIG_CLIENT_REMOVED] = + g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSinkClass, + client_removed), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, + 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS); + + gst_element_class_add_static_pad_template (gstelement_class, &sink_template); + gst_element_class_set_metadata (gstelement_class, + "SRT server sink", "Sink/Network", + "Send data over the network via SRT", + "Justin Kim "); + + gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_srt_server_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_server_sink_stop); + gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_server_sink_unlock); + gstbasesink_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_srt_server_sink_unlock_stop); + + gstsrtbasesink_class->send_buffer = + GST_DEBUG_FUNCPTR (gst_srt_server_sink_send_buffer); +} + +static void +gst_srt_server_sink_init (GstSRTServerSink * self) +{ + GstSRTServerSinkPrivate *priv = GST_SRT_SERVER_SINK_GET_PRIVATE (self); + priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT; +} diff --git a/ext/srt/gstsrtserversink.h b/ext/srt/gstsrtserversink.h new file mode 100644 index 0000000000..10eb52bafd --- /dev/null +++ b/ext/srt/gstsrtserversink.h @@ -0,0 +1,63 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_SERVER_SINK_H__ +#define __GST_SRT_SERVER_SINK_H__ + +#include "gstsrtbasesink.h" +#include + +G_BEGIN_DECLS + +#define GST_TYPE_SRT_SERVER_SINK (gst_srt_server_sink_get_type ()) +#define GST_IS_SRT_SERVER_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_SERVER_SINK)) +#define GST_IS_SRT_SERVER_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_SERVER_SINK)) +#define GST_SRT_SERVER_SINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkClass)) +#define GST_SRT_SERVER_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSink)) +#define GST_SRT_SERVER_SINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_SERVER_SINK, GstSRTServerSinkClass)) +#define GST_SRT_SERVER_SINK_CAST(obj) ((GstSRTServerSink*)(obj)) +#define GST_SRT_SERVER_SINK_CLASS_CAST(klass) ((GstSRTServerSinkClass*)(klass)) + +typedef struct _GstSRTServerSink GstSRTServerSink; +typedef struct _GstSRTServerSinkClass GstSRTServerSinkClass; +typedef struct _GstSRTServerSinkPrivate GstSRTServerSinkPrivate; + +struct _GstSRTServerSink { + GstSRTBaseSink parent; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstSRTServerSinkClass { + GstSRTBaseSinkClass parent_class; + + void (*client_added) (GstSRTServerSink *self, int sock, struct sockaddr *addr, int addr_len); + void (*client_removed) (GstSRTServerSink *self, int sock, struct sockaddr *addr, int addr_len); + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_srt_server_sink_get_type (void); + +G_END_DECLS + +#endif /* __GST_SRT_SERVER_SINK_H__ */ diff --git a/ext/srt/gstsrtserversrc.c b/ext/srt/gstsrtserversrc.c new file mode 100644 index 0000000000..9b0f586991 --- /dev/null +++ b/ext/srt/gstsrtserversrc.c @@ -0,0 +1,504 @@ +/* GStreamer SRT plugin based on libsrt + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-srtserversrc + * @title: srtserversrc + * + * srtserversrc is a network source that reads SRT + * packets from the network. Although SRT is a protocol based on UDP, srtserversrc works like + * a server socket of connection-oriented protocol, but it accepts to only one client connection. + * + * + * Examples + * |[ + * gst-launch-1.0 -v srtserversrc uri="srt://:7001" ! fakesink + * ]| This pipeline shows how to bind SRT server by setting #GstSRTServerSrc:uri property. + * + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsrtserversrc.h" +#include "gstsrt.h" +#include +#include + +#define SRT_DEFAULT_POLL_TIMEOUT 100 + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define GST_CAT_DEFAULT gst_debug_srt_server_src +GST_DEBUG_CATEGORY (GST_CAT_DEFAULT); + +struct _GstSRTServerSrcPrivate +{ + SRTSOCKET sock; + SRTSOCKET client_sock; + GSocketAddress *client_sockaddr; + + gint poll_id; + gint poll_timeout; + + gboolean has_client; + gboolean cancelled; +}; + +#define GST_SRT_SERVER_SRC_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcPrivate)) + +enum +{ + PROP_POLL_TIMEOUT = 1, + + /*< private > */ + PROP_LAST +}; + +static GParamSpec *properties[PROP_LAST]; + +enum +{ + SIG_CLIENT_ADDED, + SIG_CLIENT_CLOSED, + + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +#define gst_srt_server_src_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstSRTServerSrc, gst_srt_server_src, + GST_TYPE_SRT_BASE_SRC, G_ADD_PRIVATE (GstSRTServerSrc) + GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtserversrc", 0, + "SRT Server Source")); + +static void +gst_srt_server_src_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + g_value_set_int (value, priv->poll_timeout); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_server_src_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + switch (prop_id) { + case PROP_POLL_TIMEOUT: + priv->poll_timeout = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_srt_server_src_finalize (GObject * object) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (object); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + if (priv->poll_id != SRT_ERROR) { + srt_epoll_release (priv->poll_id); + priv->poll_id = SRT_ERROR; + } + + if (priv->sock != SRT_ERROR) { + srt_close (priv->sock); + priv->sock = SRT_ERROR; + } + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstFlowReturn +gst_srt_server_src_fill (GstPushSrc * src, GstBuffer * outbuf) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + GstFlowReturn ret = GST_FLOW_OK; + GstMapInfo info; + SRTSOCKET ready[2]; + gint recv_len; + struct sockaddr client_sa; + size_t client_sa_len; + + while (!priv->has_client) { + GST_DEBUG_OBJECT (self, "poll wait (timeout: %d)", priv->poll_timeout); + + /* Make SRT server socket non-blocking */ + srt_setsockopt (priv->sock, 0, SRTO_SNDSYN, &(int) { + 0}, sizeof (int)); + + if (srt_epoll_wait (priv->poll_id, ready, &(int) { + 2}, 0, 0, priv->poll_timeout, 0, 0, 0, 0) == -1) { + int srt_errno = srt_getlasterror (NULL); + + /* Assuming that timeout error is normal */ + if (srt_errno != SRT_ETIMEOUT) { + GST_ELEMENT_ERROR (src, RESOURCE, FAILED, + ("SRT error: %s", srt_getlasterror_str ()), (NULL)); + + return GST_FLOW_ERROR; + } + + /* Mimicking cancellable */ + if (srt_errno == SRT_ETIMEOUT && priv->cancelled) { + GST_DEBUG_OBJECT (self, "Cancelled waiting for client"); + return GST_FLOW_FLUSHING; + } + + continue; + } + + priv->client_sock = + srt_accept (priv->sock, &client_sa, (int *) &client_sa_len); + + GST_DEBUG_OBJECT (self, "checking client sock"); + if (priv->client_sock == SRT_INVALID_SOCK) { + GST_WARNING_OBJECT (self, + "detected invalid SRT client socket (reason: %s)", + srt_getlasterror_str ()); + srt_clearlasterror (); + } else { + priv->has_client = TRUE; + g_clear_object (&priv->client_sockaddr); + priv->client_sockaddr = g_socket_address_new_from_native (&client_sa, + client_sa_len); + g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0, + priv->client_sock, priv->client_sockaddr); + } + } + + GST_DEBUG_OBJECT (self, "filling buffer"); + + if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, + ("Could not map the output stream"), (NULL)); + ret = GST_FLOW_ERROR; + goto out; + } + + recv_len = srt_recvmsg (priv->client_sock, (char *) info.data, + gst_buffer_get_size (outbuf)); + + gst_buffer_unmap (outbuf, &info); + + if (recv_len == SRT_ERROR) { + GST_WARNING_OBJECT (self, "%s", srt_getlasterror_str ()); + + g_signal_emit (self, signals[SIG_CLIENT_CLOSED], 0, + priv->client_sock, priv->client_sockaddr); + + srt_close (priv->client_sock); + priv->client_sock = SRT_INVALID_SOCK; + g_clear_object (&priv->client_sockaddr); + priv->has_client = FALSE; + gst_buffer_resize (outbuf, 0, 0); + ret = GST_FLOW_OK; + goto out; + } else if (recv_len == 0) { + ret = GST_FLOW_EOS; + goto out; + } + + GST_BUFFER_PTS (outbuf) = + gst_clock_get_time (GST_ELEMENT_CLOCK (src)) - + GST_ELEMENT_CAST (src)->base_time; + + gst_buffer_resize (outbuf, 0, recv_len); + + GST_LOG_OBJECT (src, + "filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %" + GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT + ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, + gst_buffer_get_size (outbuf), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)), + GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf)); + +out: + return ret; +} + +static gboolean +gst_srt_server_src_start (GstBaseSrc * src) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + GstSRTBaseSrc *base = GST_SRT_BASE_SRC (src); + GstUri *uri = gst_uri_ref (base->uri); + GError *error = NULL; + struct sockaddr sa; + size_t sa_len; + GSocketAddress *socket_address; + const gchar *host; + int lat = base->latency; + + if (gst_uri_get_port (uri) == GST_URI_NO_PORT) { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_WRITE, NULL, (("Invalid port"))); + return FALSE; + } + + host = gst_uri_get_host (uri); + if (host == NULL) { + GInetAddress *any = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4); + + socket_address = g_inet_socket_address_new (any, gst_uri_get_port (uri)); + g_object_unref (any); + } else { + socket_address = + g_inet_socket_address_new_from_string (host, gst_uri_get_port (uri)); + } + + if (socket_address == NULL) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, ("Invalid URI"), + ("failed to extract host or port from the given URI")); + goto failed; + } + + sa_len = g_socket_address_get_native_size (socket_address); + if (!g_socket_address_to_native (socket_address, &sa, sa_len, &error)) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, ("Invalid URI"), + ("cannot resolve address (reason: %s)", error->message)); + goto failed; + } + + priv->sock = srt_socket (sa.sa_family, SOCK_DGRAM, 0); + if (priv->sock == SRT_ERROR) { + GST_ELEMENT_ERROR (self, LIBRARY, INIT, (NULL), + ("failed to create poll id for SRT socket (reason: %s)", + srt_getlasterror_str ())); + goto failed; + } + + /* Make sure TSBPD mode is enable (SRT mode) */ + srt_setsockopt (priv->sock, 0, SRTO_TSBPDMODE, &(int) { + 1}, sizeof (int)); + + /* This is a sink, we're always a receiver */ + srt_setsockopt (priv->sock, 0, SRTO_SENDER, &(int) { + 0}, sizeof (int)); + + srt_setsockopt (priv->sock, 0, SRTO_TSBPDDELAY, &lat, sizeof (int)); + + priv->poll_id = srt_epoll_create (); + if (priv->poll_id == -1) { + GST_ELEMENT_ERROR (self, LIBRARY, INIT, (NULL), + ("failed to create poll id for SRT socket (reason: %s)", + srt_getlasterror_str ())); + goto failed; + } + + srt_epoll_add_usock (priv->poll_id, priv->sock, &(int) { + SRT_EPOLL_IN}); + + if (srt_bind (priv->sock, &sa, sa_len) == SRT_ERROR) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL), + ("failed to bind SRT server socket (reason: %s)", + srt_getlasterror_str ())); + goto failed; + } + + if (srt_listen (priv->sock, 1) == SRT_ERROR) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL), + ("failed to listen SRT socket (reason: %s)", srt_getlasterror_str ())); + goto failed; + } + + g_clear_pointer (&uri, gst_uri_unref); + g_clear_object (&socket_address); + + return TRUE; + +failed: + if (priv->poll_id != SRT_ERROR) { + srt_epoll_release (priv->poll_id); + priv->poll_id = SRT_ERROR; + } + + if (priv->sock != SRT_ERROR) { + srt_close (priv->sock); + priv->sock = SRT_ERROR; + } + + g_clear_error (&error); + g_clear_pointer (&uri, gst_uri_unref); + g_clear_object (&socket_address); + + return FALSE; +} + +static gboolean +gst_srt_server_src_stop (GstBaseSrc * src) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + if (priv->client_sock != SRT_INVALID_SOCK) { + g_signal_emit (self, signals[SIG_CLIENT_ADDED], 0, + priv->client_sock, priv->client_sockaddr); + srt_close (priv->client_sock); + g_clear_object (&priv->client_sockaddr); + priv->client_sock = SRT_INVALID_SOCK; + priv->has_client = FALSE; + } + + if (priv->poll_id != SRT_ERROR) { + srt_epoll_remove_usock (priv->poll_id, priv->sock); + srt_epoll_release (priv->poll_id); + priv->poll_id = SRT_ERROR; + } + + if (priv->sock != SRT_INVALID_SOCK) { + GST_DEBUG_OBJECT (self, "closing SRT connection"); + srt_close (priv->sock); + priv->sock = SRT_INVALID_SOCK; + } + + priv->cancelled = FALSE; + + return TRUE; +} + +static gboolean +gst_srt_server_src_unlock (GstBaseSrc * src) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + priv->cancelled = TRUE; + + return TRUE; +} + +static gboolean +gst_srt_server_src_unlock_stop (GstBaseSrc * src) +{ + GstSRTServerSrc *self = GST_SRT_SERVER_SRC (src); + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + priv->cancelled = FALSE; + + return TRUE; +} + +static void +gst_srt_server_src_class_init (GstSRTServerSrcClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass); + GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass); + + gobject_class->set_property = gst_srt_server_src_set_property; + gobject_class->get_property = gst_srt_server_src_get_property; + gobject_class->finalize = gst_srt_server_src_finalize; + + /** + * GstSRTServerSrc:poll-timeout: + * + * The timeout(ms) value when polling SRT socket. For #GstSRTServerSrc, + * this value shouldn't be set as -1 (infinite) because "srt_epoll_wait" + * isn't cancellable unless closing the socket. + */ + properties[PROP_POLL_TIMEOUT] = + g_param_spec_int ("poll-timeout", "Poll timeout", + "Return poll wait after timeout miliseconds", 0, G_MAXINT32, + SRT_DEFAULT_POLL_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + + g_object_class_install_properties (gobject_class, PROP_LAST, properties); + + /** + * GstSRTServerSrc::client-added: + * @gstsrtserversrc: the srtserversrc element that emitted this signal + * @sock: the client socket descriptor that was added to srtserversrc + * @addr: the pointer of "struct sockaddr" that describes the @sock + * @addr_len: the length of @addr + * + * The given socket descriptor was added to srtserversrc. + */ + signals[SIG_CLIENT_ADDED] = + g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSrcClass, client_added), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, + 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS); + + /** + * GstSRTServerSrc::client-closed: + * @gstsrtserversrc: the srtserversrc element that emitted this signal + * @sock: the client socket descriptor that was added to srtserversrc + * @addr: the pointer of "struct sockaddr" that describes the @sock + * @addr_len: the length of @addr + * + * The given socket descriptor was closed. + */ + signals[SIG_CLIENT_CLOSED] = + g_signal_new ("client-closed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTServerSrcClass, client_closed), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, + 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS); + + gst_element_class_add_static_pad_template (gstelement_class, &src_template); + gst_element_class_set_metadata (gstelement_class, + "SRT Server source", "Source/Network", + "Receive data over the network via SRT", + "Justin Kim "); + + gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_server_src_start); + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_server_src_stop); + gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_server_src_unlock); + gstbasesrc_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_srt_server_src_unlock_stop); + + gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_server_src_fill); +} + +static void +gst_srt_server_src_init (GstSRTServerSrc * self) +{ + GstSRTServerSrcPrivate *priv = GST_SRT_SERVER_SRC_GET_PRIVATE (self); + + priv->sock = SRT_INVALID_SOCK; + priv->client_sock = SRT_INVALID_SOCK; + priv->poll_id = SRT_ERROR; + priv->poll_timeout = SRT_DEFAULT_POLL_TIMEOUT; +} diff --git a/ext/srt/gstsrtserversrc.h b/ext/srt/gstsrtserversrc.h new file mode 100644 index 0000000000..098b5cdbf7 --- /dev/null +++ b/ext/srt/gstsrtserversrc.h @@ -0,0 +1,63 @@ +/* GStreamer + * Copyright (C) 2017, Collabora Ltd. + * Author:Justin Kim + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_SRT_SERVER_SRC_H__ +#define __GST_SRT_SERVER_SRC_H__ + +#include "gstsrtbasesrc.h" +#include + +G_BEGIN_DECLS + +#define GST_TYPE_SRT_SERVER_SRC (gst_srt_server_src_get_type ()) +#define GST_IS_SRT_SERVER_SRC(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SRT_SERVER_SRC)) +#define GST_IS_SRT_SERVER_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SRT_SERVER_SRC)) +#define GST_SRT_SERVER_SRC_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcClass)) +#define GST_SRT_SERVER_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrc)) +#define GST_SRT_SERVER_SRC_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SRT_SERVER_SRC, GstSRTServerSrcClass)) +#define GST_SRT_SERVER_SRC_CAST(obj) ((GstSRTServerSrc*)(obj)) +#define GST_SRT_SERVER_SRC_CLASS_CAST(klass) ((GstSRTServerSrcClass*)(klass)) + +typedef struct _GstSRTServerSrc GstSRTServerSrc; +typedef struct _GstSRTServerSrcClass GstSRTServerSrcClass; +typedef struct _GstSRTServerSrcPrivate GstSRTServerSrcPrivate; + +struct _GstSRTServerSrc { + GstSRTBaseSrc parent; + + /*< private >*/ + gpointer _gst_reserved[GST_PADDING]; +}; + +struct _GstSRTServerSrcClass { + GstSRTBaseSrcClass parent_class; + + void (*client_added) (GstSRTServerSrc *self, int sock, struct sockaddr *addr, int addr_len); + void (*client_closed) (GstSRTServerSrc *self, int sock, struct sockaddr *addr, int addr_len); + + gpointer _gst_reserved[GST_PADDING_LARGE]; +}; + +GST_EXPORT +GType gst_srt_server_src_get_type (void); + +G_END_DECLS + +#endif /* __GST_SRT_SERVER_SRC_H__ */ diff --git a/ext/srt/meson.build b/ext/srt/meson.build new file mode 100644 index 0000000000..0d2835fb7c --- /dev/null +++ b/ext/srt/meson.build @@ -0,0 +1,27 @@ +srt_sources = [ + 'gstsrt.c', + 'gstsrtbasesrc.c', + 'gstsrtclientsrc.c', + 'gstsrtserversrc.c', + 'gstsrtbasesink.c', + 'gstsrtclientsink.c', + 'gstsrtserversink.c', +] + +srt_dep = dependency('libsrt', required : false) + +if not srt_dep.found() and cc.has_header_symbol('srt/srt.h', 'srt_startup') + srt_dep = cc.find_library('srt', required : false) +endif + +if srt_dep.found() + gstsrt = library('gstsrt', + srt_sources, + c_args : gst_plugins_bad_args, + link_args : noseh_link_args, + include_directories : [configinc, libsinc], + dependencies : [gstbase_dep, gio_dep, srt_dep], + install : true, + install_dir : plugins_install_dir, + ) +endif