gstreamer/subprojects/gst-plugins-bad/gst/rtmp2/gstrtmp2sink.c
Jordan Petridis b6c577c70c rtmp2: reimplement librtmp's connection parameters for the connect packet
librtmp allows for attaching arbitrary AMF objects to the end of the
connect packet, and this is commonly used for authenticating with
servers.

Add a new property, extra-connect-args, that mimics librtmp's behavior.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7054>
2024-08-13 21:50:17 +00:00

1266 lines
36 KiB
C

/* GStreamer
* Copyright (C) 2014 David Schleef <ds@schleef.org>
* Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
* Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
* Boston, MA 02110-1335, USA.
*/
/**
* SECTION:element-rtmp2sink
*
* The rtmp2sink element sends audio and video streams to an RTMP
* server.
*
* <refsect2>
* <title>Example launch line</title>
* |[
* gst-launch -v videotestsrc ! x264enc ! flvmux ! rtmp2sink
* location=rtmp://server.example.com/live/myStream
* ]|
* FIXME Describe what the pipeline does.
* </refsect2>
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstrtmp2elements.h"
#include "gstrtmp2sink.h"
#include "gstrtmp2locationhandler.h"
#include "rtmp/amf.h"
#include "rtmp/rtmpclient.h"
#include "rtmp/rtmpmessage.h"
#include "rtmp/rtmputils.h"
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gio/gnetworking.h>
#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_rtmp2_sink_debug_category);
#define GST_CAT_DEFAULT gst_rtmp2_sink_debug_category
/* prototypes */
#define GST_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTMP2_SINK,GstRtmp2Sink))
#define GST_IS_RTMP2_SINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTMP2_SINK))
typedef struct
{
GstBaseSink parent_instance;
/* properties */
GstRtmpLocation location;
gboolean async_connect;
guint peak_kbps;
guint32 chunk_size;
GstRtmpStopCommands stop_commands;
GstStructure *stats;
/* If both self->lock and OBJECT_LOCK are needed,
* self->lock must be taken first */
GMutex lock;
GCond cond;
gboolean running, flushing;
GstTask *task;
GRecMutex task_lock;
GMainLoop *loop;
GMainContext *context;
GCancellable *cancellable;
GstRtmpConnection *connection;
guint32 stream_id;
GPtrArray *headers;
guint64 last_ts, base_ts; /* timestamp fixup */
} GstRtmp2Sink;
typedef struct
{
GstBaseSinkClass parent_class;
} GstRtmp2SinkClass;
/* GObject virtual functions */
static void gst_rtmp2_sink_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_rtmp2_sink_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_rtmp2_sink_finalize (GObject * object);
static void gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface);
/* GstBaseSink virtual functions */
static gboolean gst_rtmp2_sink_start (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_stop (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event);
static gboolean gst_rtmp2_sink_unlock (GstBaseSink * sink);
static gboolean gst_rtmp2_sink_unlock_stop (GstBaseSink * sink);
static GstFlowReturn gst_rtmp2_sink_render (GstBaseSink * sink,
GstBuffer * buffer);
static gboolean gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
/* Internal API */
static void gst_rtmp2_sink_task_func (gpointer user_data);
static void client_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void start_publish_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void connect_task_done (GObject * object, GAsyncResult * result,
gpointer user_data);
static void set_pacing_rate (GstRtmp2Sink * self);
static void set_chunk_size (GstRtmp2Sink * self);
static GstStructure *gst_rtmp2_sink_get_stats (GstRtmp2Sink * self);
enum
{
PROP_0,
PROP_LOCATION,
PROP_SCHEME,
PROP_HOST,
PROP_PORT,
PROP_APPLICATION,
PROP_STREAM,
PROP_SECURE_TOKEN,
PROP_USERNAME,
PROP_PASSWORD,
PROP_AUTHMOD,
PROP_TIMEOUT,
PROP_TLS_VALIDATION_FLAGS,
PROP_FLASH_VERSION,
PROP_ASYNC_CONNECT,
PROP_PEAK_KBPS,
PROP_CHUNK_SIZE,
PROP_STATS,
PROP_STOP_COMMANDS,
PROP_EXTRA_CONNECT_ARGS,
};
/* pad templates */
static GstStaticPadTemplate gst_rtmp2_sink_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("video/x-flv")
);
/* class initialization */
G_DEFINE_TYPE_WITH_CODE (GstRtmp2Sink, gst_rtmp2_sink, GST_TYPE_BASE_SINK,
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
gst_rtmp2_sink_uri_handler_init);
G_IMPLEMENT_INTERFACE (GST_TYPE_RTMP_LOCATION_HANDLER, NULL));
GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtmp2sink, "rtmp2sink",
GST_RANK_PRIMARY + 1, GST_TYPE_RTMP2_SINK, rtmp2_element_init (plugin));
static void
gst_rtmp2_sink_class_init (GstRtmp2SinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
&gst_rtmp2_sink_sink_template);
gst_element_class_set_static_metadata (GST_ELEMENT_CLASS (klass),
"RTMP sink element", "Sink", "Sink element for RTMP streams",
"Make.TV, Inc. <info@make.tv>");
gobject_class->set_property = gst_rtmp2_sink_set_property;
gobject_class->get_property = gst_rtmp2_sink_get_property;
gobject_class->finalize = gst_rtmp2_sink_finalize;
base_sink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_start);
base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_stop);
base_sink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_event);
base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock);
base_sink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_unlock_stop);
base_sink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_render);
base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp2_sink_set_caps);
g_object_class_override_property (gobject_class, PROP_LOCATION, "location");
g_object_class_override_property (gobject_class, PROP_SCHEME, "scheme");
g_object_class_override_property (gobject_class, PROP_HOST, "host");
g_object_class_override_property (gobject_class, PROP_PORT, "port");
g_object_class_override_property (gobject_class, PROP_APPLICATION,
"application");
g_object_class_override_property (gobject_class, PROP_STREAM, "stream");
g_object_class_override_property (gobject_class, PROP_SECURE_TOKEN,
"secure-token");
g_object_class_override_property (gobject_class, PROP_USERNAME, "username");
g_object_class_override_property (gobject_class, PROP_PASSWORD, "password");
g_object_class_override_property (gobject_class, PROP_AUTHMOD, "authmod");
g_object_class_override_property (gobject_class, PROP_TIMEOUT, "timeout");
g_object_class_override_property (gobject_class, PROP_TLS_VALIDATION_FLAGS,
"tls-validation-flags");
g_object_class_override_property (gobject_class, PROP_FLASH_VERSION,
"flash-version");
g_object_class_install_property (gobject_class, PROP_ASYNC_CONNECT,
g_param_spec_boolean ("async-connect", "Async connect",
"Connect on READY, otherwise on first push", TRUE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PEAK_KBPS,
g_param_spec_uint ("peak-kbps", "Peak bitrate",
"Bitrate in kbit/sec to pace outgoing packets", 0, G_MAXINT / 125, 0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_PLAYING));
g_object_class_install_property (gobject_class, PROP_CHUNK_SIZE,
g_param_spec_uint ("chunk-size", "Chunk size", "RTMP chunk size",
GST_RTMP_MINIMUM_CHUNK_SIZE, GST_RTMP_MAXIMUM_CHUNK_SIZE,
GST_RTMP_DEFAULT_CHUNK_SIZE, G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_PLAYING));
g_object_class_install_property (gobject_class, PROP_STATS,
g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstRtmp2Sink:stop-commands:
*
* Which commands (if any) to send on EOS event before closing connection
*
* Since: 1.20
*/
g_object_class_install_property (gobject_class, PROP_STOP_COMMANDS,
g_param_spec_flags ("stop-commands", "Stop commands",
"RTMP commands to send on EOS event before closing connection",
GST_TYPE_RTMP_STOP_COMMANDS, GST_RTMP_DEFAULT_STOP_COMMANDS,
(GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstRtmp2Sink:extra-connect-args:
*
* Parse and append librtmp-style arbitrary data to the "connect" command.
* It can be used for non-standard authentication with some servers.
*
* The format is a whitespace-separated series of "conn=type:data" strings.
* Valid types are:
*
* - "B" for boolean with "0" and "1" for false and true, respectively, e.g.
* "conn=B:1".
* - "N" for numbers in double format, e.g. "conn=N:1.23".
* - "S" for strings, e.g. "conn=S:somepassword".
* - "O" for objects and "N"-prefixed named values are not yet supported.
*
* Since: 1.26
*/
g_object_class_install_property (gobject_class, PROP_EXTRA_CONNECT_ARGS,
g_param_spec_string ("extra-connect-args", "librtmp-style arbitrary data",
"librtmp-style arbitrary data to be appended to the \"connect\" command",
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_type_mark_as_plugin_api (GST_TYPE_RTMP_LOCATION_HANDLER, 0);
GST_DEBUG_CATEGORY_INIT (gst_rtmp2_sink_debug_category, "rtmp2sink", 0,
"debug category for rtmp2sink element");
}
static void
gst_rtmp2_sink_init (GstRtmp2Sink * self)
{
self->location.flash_ver = g_strdup ("FMLE/3.0 (compatible; FMSc/1.0)");
self->location.publish = TRUE;
self->async_connect = TRUE;
self->chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
self->stop_commands = GST_RTMP_DEFAULT_STOP_COMMANDS;
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
self->task = gst_task_new (gst_rtmp2_sink_task_func, self, NULL);
g_rec_mutex_init (&self->task_lock);
gst_task_set_lock (self->task, &self->task_lock);
self->headers = g_ptr_array_new_with_free_func
((GDestroyNotify) gst_mini_object_unref);
}
static void
gst_rtmp2_sink_uri_handler_init (GstURIHandlerInterface * iface)
{
gst_rtmp_location_handler_implement_uri_handler (iface, GST_URI_SINK);
}
static void
gst_rtmp2_sink_set_property (GObject * object, guint property_id,
const GValue * value, GParamSpec * pspec)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (object);
switch (property_id) {
case PROP_LOCATION:
gst_rtmp_location_handler_set_uri (GST_RTMP_LOCATION_HANDLER (self),
g_value_get_string (value));
break;
case PROP_SCHEME:
GST_OBJECT_LOCK (self);
self->location.scheme = g_value_get_enum (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_HOST:
GST_OBJECT_LOCK (self);
g_free (self->location.host);
self->location.host = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PORT:
GST_OBJECT_LOCK (self);
self->location.port = g_value_get_int (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_APPLICATION:
GST_OBJECT_LOCK (self);
g_free (self->location.application);
self->location.application = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_STREAM:
GST_OBJECT_LOCK (self);
g_free (self->location.stream);
self->location.stream = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_SECURE_TOKEN:
GST_OBJECT_LOCK (self);
g_free (self->location.secure_token);
self->location.secure_token = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_USERNAME:
GST_OBJECT_LOCK (self);
g_free (self->location.username);
self->location.username = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PASSWORD:
GST_OBJECT_LOCK (self);
g_free (self->location.password);
self->location.password = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_AUTHMOD:
GST_OBJECT_LOCK (self);
self->location.authmod = g_value_get_enum (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_TIMEOUT:
GST_OBJECT_LOCK (self);
self->location.timeout = g_value_get_uint (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_TLS_VALIDATION_FLAGS:
GST_OBJECT_LOCK (self);
self->location.tls_flags = g_value_get_flags (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_FLASH_VERSION:
GST_OBJECT_LOCK (self);
g_free (self->location.flash_ver);
self->location.flash_ver = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_ASYNC_CONNECT:
GST_OBJECT_LOCK (self);
self->async_connect = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PEAK_KBPS:
g_mutex_lock (&self->lock);
GST_OBJECT_LOCK (self);
self->peak_kbps = g_value_get_uint (value);
GST_OBJECT_UNLOCK (self);
set_pacing_rate (self);
g_mutex_unlock (&self->lock);
break;
case PROP_CHUNK_SIZE:
g_mutex_lock (&self->lock);
GST_OBJECT_LOCK (self);
self->chunk_size = g_value_get_uint (value);
GST_OBJECT_UNLOCK (self);
set_chunk_size (self);
g_mutex_unlock (&self->lock);
break;
case PROP_STOP_COMMANDS:
GST_OBJECT_LOCK (self);
self->stop_commands = g_value_get_flags (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_EXTRA_CONNECT_ARGS:
GST_OBJECT_LOCK (self);
g_free (self->location.extra_connect_args);
self->location.extra_connect_args = g_value_dup_string (value);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
gst_rtmp2_sink_get_property (GObject * object, guint property_id,
GValue * value, GParamSpec * pspec)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (object);
switch (property_id) {
case PROP_LOCATION:
GST_OBJECT_LOCK (self);
g_value_take_string (value, gst_rtmp_location_get_string (&self->location,
TRUE));
GST_OBJECT_UNLOCK (self);
break;
case PROP_SCHEME:
GST_OBJECT_LOCK (self);
g_value_set_enum (value, self->location.scheme);
GST_OBJECT_UNLOCK (self);
break;
case PROP_HOST:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.host);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PORT:
GST_OBJECT_LOCK (self);
g_value_set_int (value, self->location.port);
GST_OBJECT_UNLOCK (self);
break;
case PROP_APPLICATION:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.application);
GST_OBJECT_UNLOCK (self);
break;
case PROP_STREAM:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.stream);
GST_OBJECT_UNLOCK (self);
break;
case PROP_SECURE_TOKEN:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.secure_token);
GST_OBJECT_UNLOCK (self);
break;
case PROP_USERNAME:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.username);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PASSWORD:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.password);
GST_OBJECT_UNLOCK (self);
break;
case PROP_AUTHMOD:
GST_OBJECT_LOCK (self);
g_value_set_enum (value, self->location.authmod);
GST_OBJECT_UNLOCK (self);
break;
case PROP_TIMEOUT:
GST_OBJECT_LOCK (self);
g_value_set_uint (value, self->location.timeout);
GST_OBJECT_UNLOCK (self);
break;
case PROP_TLS_VALIDATION_FLAGS:
GST_OBJECT_LOCK (self);
g_value_set_flags (value, self->location.tls_flags);
GST_OBJECT_UNLOCK (self);
break;
case PROP_FLASH_VERSION:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.flash_ver);
GST_OBJECT_UNLOCK (self);
break;
case PROP_ASYNC_CONNECT:
GST_OBJECT_LOCK (self);
g_value_set_boolean (value, self->async_connect);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PEAK_KBPS:
GST_OBJECT_LOCK (self);
g_value_set_uint (value, self->peak_kbps);
GST_OBJECT_UNLOCK (self);
break;
case PROP_CHUNK_SIZE:
GST_OBJECT_LOCK (self);
g_value_set_uint (value, self->chunk_size);
GST_OBJECT_UNLOCK (self);
break;
case PROP_STATS:
g_value_take_boxed (value, gst_rtmp2_sink_get_stats (self));
break;
case PROP_STOP_COMMANDS:
GST_OBJECT_LOCK (self);
g_value_set_flags (value, self->stop_commands);
GST_OBJECT_UNLOCK (self);
break;
case PROP_EXTRA_CONNECT_ARGS:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->location.extra_connect_args);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
gst_rtmp2_sink_finalize (GObject * object)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (object);
g_clear_pointer (&self->headers, g_ptr_array_unref);
g_clear_object (&self->cancellable);
g_clear_object (&self->connection);
g_clear_object (&self->task);
g_rec_mutex_clear (&self->task_lock);
g_mutex_clear (&self->lock);
g_cond_clear (&self->cond);
g_clear_pointer (&self->stats, gst_structure_free);
gst_rtmp_location_clear (&self->location);
G_OBJECT_CLASS (gst_rtmp2_sink_parent_class)->finalize (object);
}
static gboolean
gst_rtmp2_sink_start (GstBaseSink * sink)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
gboolean async;
GST_OBJECT_LOCK (self);
async = self->async_connect;
GST_OBJECT_UNLOCK (self);
GST_INFO_OBJECT (self, "Starting (%s)", async ? "async" : "delayed");
g_clear_object (&self->cancellable);
self->running = TRUE;
self->cancellable = g_cancellable_new ();
self->stream_id = 0;
self->last_ts = 0;
self->base_ts = 0;
if (async) {
gst_task_start (self->task);
}
return TRUE;
}
static gboolean
quit_invoker (gpointer user_data)
{
g_main_loop_quit (user_data);
return G_SOURCE_REMOVE;
}
static void
stop_task (GstRtmp2Sink * self)
{
gst_task_stop (self->task);
self->running = FALSE;
if (self->cancellable) {
GST_DEBUG_OBJECT (self, "Cancelling");
g_cancellable_cancel (self->cancellable);
}
if (self->loop) {
GST_DEBUG_OBJECT (self, "Stopping loop");
g_main_context_invoke_full (self->context, G_PRIORITY_DEFAULT_IDLE,
quit_invoker, g_main_loop_ref (self->loop),
(GDestroyNotify) g_main_loop_unref);
}
g_cond_broadcast (&self->cond);
}
static gboolean
gst_rtmp2_sink_stop (GstBaseSink * sink)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
GST_DEBUG_OBJECT (self, "stop");
g_mutex_lock (&self->lock);
stop_task (self);
g_mutex_unlock (&self->lock);
gst_task_join (self->task);
return TRUE;
}
static gboolean
stop_publish_invoker (gpointer user_data)
{
GstRtmp2Sink *self = user_data;
if (self->connection) {
GST_OBJECT_LOCK (self);
if (self->stop_commands != GST_RTMP_STOP_COMMANDS_NONE) {
gst_rtmp_client_stop_publish (self->connection, self->location.stream,
self->stop_commands);
}
GST_OBJECT_UNLOCK (self);
}
return G_SOURCE_REMOVE;
}
static gboolean
gst_rtmp2_sink_event (GstBaseSink * sink, GstEvent * event)
{
GstEventType type;
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
type = GST_EVENT_TYPE (event);
switch (type) {
case GST_EVENT_EOS:
g_mutex_lock (&self->lock);
if (self->loop) {
GST_DEBUG_OBJECT (self, "Got EOS: stopping publish");
g_main_context_invoke (self->context, stop_publish_invoker, self);
}
g_mutex_unlock (&self->lock);
break;
default:
break;
}
return GST_BASE_SINK_CLASS (gst_rtmp2_sink_parent_class)->event (sink, event);
}
static gboolean
gst_rtmp2_sink_unlock (GstBaseSink * sink)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
GST_DEBUG_OBJECT (self, "unlock");
g_mutex_lock (&self->lock);
self->flushing = TRUE;
g_cond_broadcast (&self->cond);
g_mutex_unlock (&self->lock);
return TRUE;
}
static gboolean
gst_rtmp2_sink_unlock_stop (GstBaseSink * sink)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
GST_DEBUG_OBJECT (self, "unlock_stop");
g_mutex_lock (&self->lock);
self->flushing = FALSE;
g_mutex_unlock (&self->lock);
return TRUE;
}
static gboolean
buffer_to_message (GstRtmp2Sink * self, GstBuffer * buffer, GstBuffer ** outbuf)
{
GstBuffer *message;
GstRtmpFlvTagHeader header;
guint64 timestamp;
guint32 cstream;
{
GstMapInfo info;
if (G_UNLIKELY (!gst_buffer_map (buffer, &info, GST_MAP_READ))) {
GST_ERROR_OBJECT (self, "map failed: %" GST_PTR_FORMAT, buffer);
return FALSE;
}
/* FIXME: This is ugly and only works behind flvmux.
* Implement true RTMP muxing. */
if (G_UNLIKELY (info.size >= 4 && memcmp (info.data, "FLV", 3) == 0)) {
/* drop the header, we don't need it */
GST_DEBUG_OBJECT (self, "ignoring FLV header: %" GST_PTR_FORMAT, buffer);
gst_buffer_unmap (buffer, &info);
*outbuf = NULL;
return TRUE;
}
if (!gst_rtmp_flv_tag_parse_header (&header, info.data, info.size)) {
GST_ERROR_OBJECT (self, "too small for tag header: %" GST_PTR_FORMAT,
buffer);
gst_buffer_unmap (buffer, &info);
return FALSE;
}
if (info.size < header.total_size) {
GST_ERROR_OBJECT (self, "too small for tag body: buffer %" G_GSIZE_FORMAT
", tag %" G_GSIZE_FORMAT, info.size, header.total_size);
gst_buffer_unmap (buffer, &info);
return FALSE;
}
/* flvmux timestamps roll over after about 49 days */
timestamp = header.timestamp;
if (timestamp + self->base_ts + G_MAXINT32 < self->last_ts) {
GST_WARNING_OBJECT (self, "Timestamp regression %" G_GUINT64_FORMAT
" -> %" G_GUINT64_FORMAT "; assuming overflow", self->last_ts,
timestamp + self->base_ts);
self->base_ts += G_MAXUINT32;
self->base_ts += 1;
} else if (timestamp + self->base_ts > self->last_ts + G_MAXINT32) {
GST_WARNING_OBJECT (self, "Timestamp jump %" G_GUINT64_FORMAT
" -> %" G_GUINT64_FORMAT "; assuming underflow", self->last_ts,
timestamp + self->base_ts);
if (self->base_ts > 0) {
self->base_ts -= G_MAXUINT32;
self->base_ts -= 1;
} else {
GST_WARNING_OBJECT (self, "Cannot regress further;"
" forcing timestamp to zero");
timestamp = 0;
}
}
timestamp += self->base_ts;
self->last_ts = timestamp;
gst_buffer_unmap (buffer, &info);
}
switch (header.type) {
case GST_RTMP_MESSAGE_TYPE_DATA_AMF0:
cstream = 4;
break;
case GST_RTMP_MESSAGE_TYPE_AUDIO:
cstream = 5;
break;
case GST_RTMP_MESSAGE_TYPE_VIDEO:
cstream = 6;
break;
default:
GST_ERROR_OBJECT (self, "unknown tag type %d", header.type);
return FALSE;
}
/* May not know stream ID yet; set later */
message = gst_rtmp_message_new (header.type, cstream, 0);
message = gst_buffer_append_region (message, gst_buffer_ref (buffer),
GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
GST_BUFFER_DTS (message) = timestamp * GST_MSECOND;
*outbuf = message;
return TRUE;
}
static gboolean
should_drop_header (GstRtmp2Sink * self, GstBuffer * buffer)
{
guint len;
if (G_LIKELY (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER))) {
return FALSE;
}
g_mutex_lock (&self->lock);
len = self->headers->len;
g_mutex_unlock (&self->lock);
/* Drop header buffers when we have streamheader caps */
return len > 0;
}
static void
send_message (GstRtmp2Sink * self, GstBuffer * message)
{
GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (message);
g_return_if_fail (meta != NULL);
g_return_if_fail (self->stream_id != 0);
meta->mstream = self->stream_id;
if (gst_rtmp_message_is_metadata (message)) {
gst_rtmp_connection_set_data_frame (self->connection, message);
} else {
gst_rtmp_connection_queue_message (self->connection, message);
}
}
static void
send_streamheader (GstRtmp2Sink * self)
{
guint i;
if (G_LIKELY (self->headers->len == 0)) {
return;
}
GST_DEBUG_OBJECT (self, "Sending %u streamheader messages",
self->headers->len);
for (i = 0; i < self->headers->len; i++) {
send_message (self, g_ptr_array_index (self->headers, i));
}
/* Steal pointers: suppress free */
g_ptr_array_set_free_func (self->headers, NULL);
g_ptr_array_set_size (self->headers, 0);
g_ptr_array_set_free_func (self->headers,
(GDestroyNotify) gst_mini_object_unref);
}
static inline gboolean
is_running (GstRtmp2Sink * self)
{
return G_LIKELY (self->running && !self->flushing);
}
static GstFlowReturn
gst_rtmp2_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
GstBuffer *message = NULL;
GstFlowReturn ret;
if (G_UNLIKELY (should_drop_header (self, buffer))) {
GST_DEBUG_OBJECT (self, "Skipping header %" GST_PTR_FORMAT, buffer);
return GST_FLOW_OK;
}
GST_LOG_OBJECT (self, "render %" GST_PTR_FORMAT, buffer);
if (G_UNLIKELY (!buffer_to_message (self, buffer, &message))) {
GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Failed to convert FLV to RTMP"),
("Failed to convert %" GST_PTR_FORMAT, buffer));
return GST_FLOW_ERROR;
}
if (G_UNLIKELY (!message)) {
GST_DEBUG_OBJECT (self, "Skipping %" GST_PTR_FORMAT, buffer);
return GST_FLOW_OK;
}
g_mutex_lock (&self->lock);
if (G_UNLIKELY (is_running (self) && self->cancellable &&
gst_task_get_state (self->task) != GST_TASK_STARTED)) {
GST_DEBUG_OBJECT (self, "Starting connect");
gst_task_start (self->task);
}
while (G_UNLIKELY (is_running (self) && !self->connection)) {
GST_DEBUG_OBJECT (self, "Waiting for connection");
g_cond_wait (&self->cond, &self->lock);
}
while (G_UNLIKELY (is_running (self) && self->connection &&
gst_rtmp_connection_get_num_queued (self->connection) > 3)) {
GST_LOG_OBJECT (self, "Waiting for queue");
g_cond_wait (&self->cond, &self->lock);
}
if (G_UNLIKELY (!self->connection)) {
gst_buffer_unref (message);
/* send_connect_error has sent an ERROR message */
ret = GST_FLOW_ERROR;
} else if (G_UNLIKELY (!is_running (self))) {
gst_buffer_unref (message);
ret = GST_FLOW_FLUSHING;
} else {
send_streamheader (self);
send_message (self, message);
ret = GST_FLOW_OK;
}
g_mutex_unlock (&self->lock);
return ret;
}
static gboolean
add_streamheader (GstRtmp2Sink * self, const GValue * value)
{
GstBuffer *buffer, *message;
g_return_val_if_fail (value, FALSE);
if (!GST_VALUE_HOLDS_BUFFER (value)) {
GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
G_VALUE_TYPE_NAME (value));
return FALSE;
}
buffer = gst_value_get_buffer (value);
if (!buffer_to_message (self, buffer, &message)) {
GST_ERROR_OBJECT (self, "Failed to read streamheader %" GST_PTR_FORMAT,
buffer);
return FALSE;
}
if (message) {
GST_DEBUG_OBJECT (self, "Adding streamheader %" GST_PTR_FORMAT, buffer);
g_ptr_array_add (self->headers, message);
} else {
GST_DEBUG_OBJECT (self, "Skipping streamheader %" GST_PTR_FORMAT, buffer);
}
return TRUE;
}
static gboolean
gst_rtmp2_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (sink);
GstStructure *s;
const GValue *streamheader;
guint i = 0;
GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps);
g_ptr_array_set_size (self->headers, 0);
s = gst_caps_get_structure (caps, 0);
streamheader = gst_structure_get_value (s, "streamheader");
if (!streamheader) {
GST_DEBUG_OBJECT (self, "'streamheader' field not present");
} else if (GST_VALUE_HOLDS_BUFFER (streamheader)) {
GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer");
if (!add_streamheader (self, streamheader)) {
return FALSE;
}
i = 1;
} else if (GST_VALUE_HOLDS_ARRAY (streamheader)) {
guint size = gst_value_array_get_size (streamheader);
GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
for (; i < size; i++) {
const GValue *v = gst_value_array_get_value (streamheader, i);
if (!add_streamheader (self, v)) {
return FALSE;
}
}
} else {
GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'",
G_VALUE_TYPE_NAME (streamheader));
return FALSE;
}
GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers -> %u messages",
i, self->headers->len);
return TRUE;
}
/* Mainloop task */
static void
gst_rtmp2_sink_task_func (gpointer user_data)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
GMainContext *context;
GMainLoop *loop;
GTask *connector;
GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task starting");
g_mutex_lock (&self->lock);
context = self->context = g_main_context_new ();
g_main_context_push_thread_default (context);
loop = self->loop = g_main_loop_new (context, TRUE);
connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
g_clear_pointer (&self->stats, gst_structure_free);
GST_OBJECT_LOCK (self);
gst_rtmp_client_connect_async (&self->location, self->cancellable,
client_connect_done, connector);
GST_OBJECT_UNLOCK (self);
/* Run loop */
g_mutex_unlock (&self->lock);
g_main_loop_run (loop);
g_mutex_lock (&self->lock);
if (self->connection) {
self->stats = gst_rtmp_connection_get_stats (self->connection);
}
g_clear_pointer (&self->loop, g_main_loop_unref);
g_clear_pointer (&self->connection, gst_rtmp_connection_close_and_unref);
g_cond_broadcast (&self->cond);
/* Run loop cleanup */
g_mutex_unlock (&self->lock);
while (g_main_context_pending (context)) {
GST_DEBUG_OBJECT (self, "iterating main context to clean up");
g_main_context_iteration (context, FALSE);
}
g_main_context_pop_thread_default (context);
g_mutex_lock (&self->lock);
g_clear_pointer (&self->context, g_main_context_unref);
g_ptr_array_set_size (self->headers, 0);
g_mutex_unlock (&self->lock);
GST_DEBUG_OBJECT (self, "gst_rtmp2_sink_task exiting");
}
static void
client_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data)
{
GTask *task = user_data;
GstRtmp2Sink *self = g_task_get_source_object (task);
GError *error = NULL;
GstRtmpConnection *connection;
connection = gst_rtmp_client_connect_finish (result, &error);
if (!connection) {
g_task_return_error (task, error);
g_object_unref (task);
return;
}
g_task_set_task_data (task, connection, g_object_unref);
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
GST_OBJECT_LOCK (self);
gst_rtmp_client_start_publish_async (connection, self->location.stream,
g_task_get_cancellable (task), start_publish_done, task);
GST_OBJECT_UNLOCK (self);
}
static void
start_publish_done (GObject * source, GAsyncResult * result, gpointer user_data)
{
GTask *task = G_TASK (user_data);
GstRtmp2Sink *self = g_task_get_source_object (task);
GstRtmpConnection *connection = g_task_get_task_data (task);
GError *error = NULL;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (gst_rtmp_client_start_publish_finish (connection, result,
&self->stream_id, &error)) {
g_task_return_pointer (task, g_object_ref (connection),
gst_rtmp_connection_close_and_unref);
} else {
g_task_return_error (task, error);
}
g_task_set_task_data (task, NULL, NULL);
g_object_unref (task);
}
static void
put_chunk (GstRtmpConnection * connection, gpointer user_data)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (user_data);
g_mutex_lock (&self->lock);
g_cond_signal (&self->cond);
g_mutex_unlock (&self->lock);
}
static void
error_callback (GstRtmpConnection * connection, const GError * error,
GstRtmp2Sink * self)
{
g_mutex_lock (&self->lock);
if (self->cancellable) {
g_cancellable_cancel (self->cancellable);
} else if (self->loop) {
GST_ELEMENT_ERROR (self, RESOURCE, WRITE,
("Connection error: %s", error->message),
("domain %s, code %d", g_quark_to_string (error->domain), error->code));
stop_task (self);
}
g_mutex_unlock (&self->lock);
}
static void
send_connect_error (GstRtmp2Sink * self, GError * error)
{
if (!error) {
GST_ERROR_OBJECT (self, "Connect failed with NULL error");
GST_ELEMENT_ERROR (self, RESOURCE, FAILED, ("Failed to connect"), (NULL));
return;
}
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (self, "Connection was cancelled: %s", error->message);
return;
}
GST_ERROR_OBJECT (self, "Failed to connect: %s %d %s",
g_quark_to_string (error->domain), error->code, error->message);
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) {
GST_ELEMENT_ERROR (self, RESOURCE, NOT_AUTHORIZED,
("Not authorized to connect: %s", error->message), (NULL));
} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED)) {
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ,
("Connection refused: %s", error->message), (NULL));
} else {
GST_ELEMENT_ERROR (self, RESOURCE, FAILED,
("Failed to connect: %s", error->message),
("domain %s, code %d", g_quark_to_string (error->domain), error->code));
}
}
static void
connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
{
GstRtmp2Sink *self = GST_RTMP2_SINK (object);
GTask *task = G_TASK (result);
GError *error = NULL;
g_mutex_lock (&self->lock);
g_warn_if_fail (g_task_is_valid (task, object));
if (self->cancellable == g_task_get_cancellable (task)) {
g_clear_object (&self->cancellable);
}
self->connection = g_task_propagate_pointer (task, &error);
if (self->connection) {
set_pacing_rate (self);
set_chunk_size (self);
gst_rtmp_connection_set_output_handler (self->connection,
put_chunk, g_object_ref (self), g_object_unref);
g_signal_connect_object (self->connection, "error",
G_CALLBACK (error_callback), self, 0);
} else {
send_connect_error (self, error);
stop_task (self);
g_error_free (error);
}
g_cond_broadcast (&self->cond);
g_mutex_unlock (&self->lock);
}
static gboolean
socket_set_pacing_rate (GSocket * socket, gint pacing_rate, GError ** error)
{
#ifdef SO_MAX_PACING_RATE
if (!g_socket_set_option (socket, SOL_SOCKET, SO_MAX_PACING_RATE,
pacing_rate, error)) {
g_prefix_error (error, "setsockopt failed: ");
return FALSE;
}
#else
if (pacing_rate != -1) {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
"SO_MAX_PACING_RATE is not supported");
return FALSE;
}
#endif
return TRUE;
}
static void
set_pacing_rate (GstRtmp2Sink * self)
{
GError *error = NULL;
gint pacing_rate;
if (!self->connection)
return;
GST_OBJECT_LOCK (self);
pacing_rate = self->peak_kbps ? self->peak_kbps * 125 : -1;
GST_OBJECT_UNLOCK (self);
if (socket_set_pacing_rate (gst_rtmp_connection_get_socket (self->connection),
pacing_rate, &error))
GST_INFO_OBJECT (self, "Set pacing rate to %d Bps", pacing_rate);
else
GST_WARNING_OBJECT (self, "Could not set pacing rate: %s", error->message);
g_clear_error (&error);
}
static void
set_chunk_size (GstRtmp2Sink * self)
{
guint32 chunk_size;
if (!self->connection)
return;
GST_OBJECT_LOCK (self);
chunk_size = self->chunk_size;
GST_OBJECT_UNLOCK (self);
gst_rtmp_connection_set_chunk_size (self->connection, chunk_size);
GST_INFO_OBJECT (self, "Set chunk size to %" G_GUINT32_FORMAT, chunk_size);
}
static GstStructure *
gst_rtmp2_sink_get_stats (GstRtmp2Sink * self)
{
GstStructure *s;
g_mutex_lock (&self->lock);
if (self->connection) {
s = gst_rtmp_connection_get_stats (self->connection);
} else if (self->stats) {
s = gst_structure_copy (self->stats);
} else {
s = gst_rtmp_connection_get_null_stats ();
}
g_mutex_unlock (&self->lock);
return s;
}