gstreamer/gst/rtmp2/rtmp/rtmpclient.c
Matthew Waters 640a65bf96 gst: don't use volatile to mean atomic
volatile is not sufficient to provide atomic guarantees and real atomics
should be used instead.  GCC 11 has started warning about using volatile
with atomic operations.

https://gitlab.gnome.org/GNOME/glib/-/merge_requests/1719

Discovered in https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/issues/868

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/2098>
2021-03-22 14:34:36 +11:00

1408 lines
40 KiB
C

/* GStreamer RTMP Library
* Copyright (C) 2013 David Schleef <ds@schleef.org>
* Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
* Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
* Boston, MA 02110-1335, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <gio/gio.h>
#include <string.h>
#include "rtmpclient.h"
#include "rtmphandshake.h"
#include "rtmpmessage.h"
#include "rtmputils.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_client_debug_category
static void send_connect_done (const gchar * command_name, GPtrArray * args,
gpointer user_data);
static void create_stream_done (const gchar * command_name, GPtrArray * args,
gpointer user_data);
static void on_publish_or_play_status (const gchar * command_name,
GPtrArray * args, gpointer user_data);
static void
init_debug (void)
{
static gsize done = 0;
if (g_once_init_enter (&done)) {
GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
"rtmpclient", 0, "debug category for the rtmp client");
GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
g_once_init_leave (&done, 1);
}
}
static const gchar *scheme_strings[] = {
"rtmp",
"rtmps",
NULL
};
#define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
GType
gst_rtmp_scheme_get_type (void)
{
static gsize scheme_type = 0;
static const GEnumValue scheme[] = {
{GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
{GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
{0, NULL, NULL},
};
if (g_once_init_enter (&scheme_type)) {
GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
g_once_init_leave (&scheme_type, tmp);
}
return (GType) scheme_type;
}
GstRtmpScheme
gst_rtmp_scheme_from_string (const gchar * string)
{
if (string) {
gint value;
for (value = 0; value < NUM_SCHEMES; value++) {
if (strcmp (scheme_strings[value], string) == 0) {
return value;
}
}
}
return -1;
}
GstRtmpScheme
gst_rtmp_scheme_from_uri (const GstUri * uri)
{
const gchar *scheme = gst_uri_get_scheme (uri);
if (!scheme) {
return GST_RTMP_SCHEME_RTMP;
}
return gst_rtmp_scheme_from_string (scheme);
}
const gchar *
gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
{
if (scheme >= 0 && scheme < NUM_SCHEMES) {
return scheme_strings[scheme];
}
return "invalid";
}
const gchar *const *
gst_rtmp_scheme_get_strings (void)
{
return scheme_strings;
}
guint
gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
{
switch (scheme) {
case GST_RTMP_SCHEME_RTMP:
return 1935;
case GST_RTMP_SCHEME_RTMPS:
return 443;
default:
g_return_val_if_reached (0);
}
}
GType
gst_rtmp_authmod_get_type (void)
{
static gsize authmod_type = 0;
static const GEnumValue authmod[] = {
{GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
{GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
{GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
{0, NULL, NULL},
};
if (g_once_init_enter (&authmod_type)) {
GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
g_once_init_leave (&authmod_type, tmp);
}
return (GType) authmod_type;
}
static const gchar *
gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
{
GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
return ev ? ev->value_nick : "(unknown)";
}
GType
gst_rtmp_stop_commands_get_type (void)
{
static gsize stop_commands_type = 0;
static const GFlagsValue stop_commands[] = {
{GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
{GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
{GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
{GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
{0, NULL, NULL},
};
if (g_once_init_enter (&stop_commands_type)) {
GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
g_once_init_leave (&stop_commands_type, tmp);
}
return (GType) stop_commands_type;
}
void
gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
{
g_return_if_fail (dest);
g_return_if_fail (src);
dest->scheme = src->scheme;
dest->host = g_strdup (src->host);
dest->port = src->port;
dest->application = g_strdup (src->application);
dest->stream = g_strdup (src->stream);
dest->username = g_strdup (src->username);
dest->password = g_strdup (src->password);
dest->secure_token = g_strdup (src->secure_token);
dest->authmod = src->authmod;
dest->timeout = src->timeout;
dest->tls_flags = src->tls_flags;
dest->flash_ver = g_strdup (src->flash_ver);
dest->publish = src->publish;
}
void
gst_rtmp_location_clear (GstRtmpLocation * location)
{
g_return_if_fail (location);
g_clear_pointer (&location->host, g_free);
location->port = 0;
g_clear_pointer (&location->application, g_free);
g_clear_pointer (&location->stream, g_free);
g_clear_pointer (&location->username, g_free);
g_clear_pointer (&location->password, g_free);
g_clear_pointer (&location->secure_token, g_free);
g_clear_pointer (&location->flash_ver, g_free);
location->publish = FALSE;
}
gchar *
gst_rtmp_location_get_string (const GstRtmpLocation * location,
gboolean with_stream)
{
GstUri *uri;
gchar *base, *string;
const gchar *scheme_string;
guint default_port;
g_return_val_if_fail (location, NULL);
scheme_string = gst_rtmp_scheme_to_string (location->scheme);
default_port = gst_rtmp_scheme_get_default_port (location->scheme);
uri = gst_uri_new (scheme_string, NULL, location->host,
location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
NULL, NULL);
base = gst_uri_to_string (uri);
string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
location->stream, NULL);
g_free (base);
gst_uri_unref (uri);
return string;
}
/* Flag values for the audioCodecs property,
* rtmp_specification_1.0.pdf page 32 */
enum
{
SUPPORT_SND_NONE = 0x001, /* Raw sound, no compression */
SUPPORT_SND_ADPCM = 0x002, /* ADPCM compression */
SUPPORT_SND_MP3 = 0x004, /* mp3 compression */
SUPPORT_SND_INTEL = 0x008, /* Not used */
SUPPORT_SND_UNUSED = 0x010, /* Not used */
SUPPORT_SND_NELLY8 = 0x020, /* NellyMoser at 8-kHz compression */
SUPPORT_SND_NELLY = 0x040, /* NellyMoser compression
* (5, 11, 22, and 44 kHz) */
SUPPORT_SND_G711A = 0x080, /* G711A sound compression
* (Flash Media Server only) */
SUPPORT_SND_G711U = 0x100, /* G711U sound compression
* (Flash Media Server only) */
SUPPORT_SND_NELLY16 = 0x200, /* NellyMoser at 16-kHz compression */
SUPPORT_SND_AAC = 0x400, /* Advanced audio coding (AAC) codec */
SUPPORT_SND_SPEEX = 0x800, /* Speex Audio */
SUPPORT_SND_ALL = 0xFFF, /* All RTMP-supported audio codecs */
};
/* audioCodecs value sent by libavformat. All "used" codecs. */
#define GST_RTMP_AUDIOCODECS \
(SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
/* Flag values for the videoCodecs property,
* rtmp_specification_1.0.pdf page 32 */
enum
{
SUPPORT_VID_UNUSED = 0x01, /* Obsolete value */
SUPPORT_VID_JPEG = 0x02, /* Obsolete value */
SUPPORT_VID_SORENSON = 0x04, /* Sorenson Flash video */
SUPPORT_VID_HOMEBREW = 0x08, /* V1 screen sharing */
SUPPORT_VID_VP6 = 0x10, /* On2 video (Flash 8+) */
SUPPORT_VID_VP6ALPHA = 0x20, /* On2 video with alpha channel */
SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
SUPPORT_VID_H264 = 0x80, /* H264 video */
SUPPORT_VID_ALL = 0xFF, /* All RTMP-supported video codecs */
};
/* videoCodecs value sent by libavformat. All non-obsolete codecs. */
#define GST_RTMP_VIDEOCODECS \
(SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252); /* libavformat's magic number */
/* Flag values for the videoFunction property,
* rtmp_specification_1.0.pdf page 32 */
enum
{
/* Indicates that the client can perform frame-accurate seeks. */
SUPPORT_VID_CLIENT_SEEK = 1,
};
/* videoFunction value sent by libavformat */
#define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1); /* libavformat's magic number */
static void socket_connect (GTask * task);
static void socket_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void handshake_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void send_connect (GTask * task);
static void send_stop (GstRtmpConnection * connection, const gchar * stream,
const GstRtmpStopCommands stop_commands);
static void send_secure_token_response (GTask * task,
GstRtmpConnection * connection, const gchar * challenge);
static void connection_error (GstRtmpConnection * connection,
gpointer user_data);
#define DEFAULT_TIMEOUT 5
typedef struct
{
GstRtmpLocation location;
gchar *auth_query;
GstRtmpConnection *connection;
gulong error_handler_id;
} ConnectTaskData;
static ConnectTaskData *
connect_task_data_new (const GstRtmpLocation * location)
{
ConnectTaskData *data = g_slice_new0 (ConnectTaskData);
gst_rtmp_location_copy (&data->location, location);
return data;
}
static void
connect_task_data_free (gpointer ptr)
{
ConnectTaskData *data = ptr;
gst_rtmp_location_clear (&data->location);
g_clear_pointer (&data->auth_query, g_free);
if (data->error_handler_id) {
g_signal_handler_disconnect (data->connection, data->error_handler_id);
}
g_clear_object (&data->connection);
g_slice_free (ConnectTaskData, data);
}
static GRegex *auth_regex = NULL;
void
gst_rtmp_client_connect_async (const GstRtmpLocation * location,
GCancellable * cancellable, GAsyncReadyCallback callback,
gpointer user_data)
{
GTask *task;
init_debug ();
if (g_once_init_enter (&auth_regex)) {
GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
"\\[ *authmod=(?<authmod>.*?) *\\] *: *"
"(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
g_once_init_leave (&auth_regex, re);
}
task = g_task_new (NULL, cancellable, callback, user_data);
g_task_set_task_data (task, connect_task_data_new (location),
connect_task_data_free);
socket_connect (task);
}
static void
socket_connect (GTask * task)
{
ConnectTaskData *data = g_task_get_task_data (task);
GSocketConnectable *addr;
GSocketClient *socket_client;
if (data->location.timeout < 0) {
data->location.timeout = DEFAULT_TIMEOUT;
}
if (data->error_handler_id) {
g_signal_handler_disconnect (data->connection, data->error_handler_id);
data->error_handler_id = 0;
}
if (data->connection) {
gst_rtmp_connection_close (data->connection);
g_clear_object (&data->connection);
}
if (!data->location.host) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Host is not set");
g_object_unref (task);
return;
}
if (!data->location.port) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Port is not set");
g_object_unref (task);
return;
}
socket_client = g_socket_client_new ();
g_socket_client_set_timeout (socket_client, data->location.timeout);
switch (data->location.scheme) {
case GST_RTMP_SCHEME_RTMP:
break;
case GST_RTMP_SCHEME_RTMPS:
GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
data->location.tls_flags);
g_socket_client_set_tls (socket_client, TRUE);
g_socket_client_set_tls_validation_flags (socket_client,
data->location.tls_flags);
break;
default:
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
"Invalid scheme ID %d", data->location.scheme);
g_object_unref (socket_client);
g_object_unref (task);
return;
}
addr = g_network_address_new (data->location.host, data->location.port);
GST_DEBUG ("Starting socket connection");
g_socket_client_connect_async (socket_client, addr,
g_task_get_cancellable (task), socket_connect_done, task);
g_object_unref (addr);
g_object_unref (socket_client);
}
static void
socket_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data)
{
GSocketClient *socket_client = G_SOCKET_CLIENT (source);
GSocketConnection *socket_connection;
GTask *task = user_data;
GError *error = NULL;
socket_connection =
g_socket_client_connect_finish (socket_client, result, &error);
if (g_task_return_error_if_cancelled (task)) {
GST_DEBUG ("Socket connection was cancelled");
g_object_unref (task);
return;
}
if (socket_connection == NULL) {
GST_ERROR ("Socket connection error");
g_task_return_error (task, error);
g_object_unref (task);
return;
}
GST_DEBUG ("Socket connection established");
gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
g_task_get_cancellable (task), handshake_done, task);
g_object_unref (socket_connection);
}
static void
handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
{
GIOStream *stream = G_IO_STREAM (source);
GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
GTask *task = user_data;
ConnectTaskData *data = g_task_get_task_data (task);
GError *error = NULL;
gboolean res;
res = gst_rtmp_client_handshake_finish (stream, result, &error);
if (!res) {
g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
data->connection = gst_rtmp_connection_new (socket_connection,
g_task_get_cancellable (task));
data->error_handler_id = g_signal_connect (data->connection,
"error", G_CALLBACK (connection_error), task);
send_connect (task);
}
static void
connection_error (GstRtmpConnection * connection, gpointer user_data)
{
GTask *task = user_data;
if (!g_task_had_error (task))
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"error during connection attempt");
}
static gchar *
do_adobe_auth (const gchar * username, const gchar * password,
const gchar * salt, const gchar * opaque, const gchar * challenge)
{
guint8 hash[16]; /* MD5 digest */
gsize hashlen = sizeof hash;
gchar *challenge2, *auth_query;
GChecksum *md5;
g_return_val_if_fail (username, NULL);
g_return_val_if_fail (password, NULL);
g_return_val_if_fail (salt, NULL);
md5 = g_checksum_new (G_CHECKSUM_MD5);
g_checksum_update (md5, (guchar *) username, -1);
g_checksum_update (md5, (guchar *) salt, -1);
g_checksum_update (md5, (guchar *) password, -1);
g_checksum_get_digest (md5, hash, &hashlen);
g_warn_if_fail (hashlen == sizeof hash);
{
gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
g_checksum_reset (md5);
g_checksum_update (md5, (guchar *) hashstr, -1);
g_free (hashstr);
}
if (opaque)
g_checksum_update (md5, (guchar *) opaque, -1);
else if (challenge)
g_checksum_update (md5, (guchar *) challenge, -1);
challenge2 = g_strdup_printf ("%08x", g_random_int ());
g_checksum_update (md5, (guchar *) challenge2, -1);
g_checksum_get_digest (md5, hash, &hashlen);
g_warn_if_fail (hashlen == sizeof hash);
{
gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
if (opaque) {
auth_query =
g_strdup_printf
("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
username, challenge2, hashstr, opaque);
} else {
auth_query =
g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
"adobe", username, challenge2, hashstr);
}
g_free (hashstr);
}
g_checksum_free (md5);
g_free (challenge2);
return auth_query;
}
static void
send_connect (GTask * task)
{
ConnectTaskData *data = g_task_get_task_data (task);
GstAmfNode *node;
const gchar *app, *flash_ver;
gchar *uri, *appstr = NULL, *uristr = NULL;
gboolean publish;
node = gst_amf_node_new_object ();
app = data->location.application;
flash_ver = data->location.flash_ver;
publish = data->location.publish;
uri = gst_rtmp_location_get_string (&data->location, FALSE);
if (!app) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Application is not set");
g_object_unref (task);
goto out;
}
if (!flash_ver) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Flash version is not set");
g_object_unref (task);
goto out;
}
if (data->auth_query) {
const gchar *query = data->auth_query;
appstr = g_strdup_printf ("%s?%s", app, query);
uristr = g_strdup_printf ("%s?%s", uri, query);
} else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
const gchar *user = data->location.username;
const gchar *authmod = "adobe";
if (!user) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"no username for adobe authentication");
g_object_unref (task);
goto out;
}
if (!data->location.password) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"no password for adobe authentication");
g_object_unref (task);
goto out;
}
appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
} else {
appstr = g_strdup (app);
uristr = g_strdup (uri);
}
/* Arguments for the connect command.
* Most of these are described in rtmp_specification_1.0.pdf page 30 */
/* "The server application name the client is connected to." */
gst_amf_node_append_field_take_string (node, "app", appstr, -1);
if (publish) {
/* Undocumented. Sent by both libavformat and librtmp. */
gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
}
/* "Flash Player version. It is the same string as returned by the
* ApplicationScript getversion () function." */
gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
/* "URL of the source SWF file making the connection."
* XXX: libavformat sends "swfUrl" here, if provided. */
/* "URL of the Server. It has the following format.
* protocol://servername:port/appName/appInstance" */
gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
if (!publish) {
/* "True if proxy is being used." */
gst_amf_node_append_field_boolean (node, "fpad", FALSE);
/* Undocumented. Sent by libavformat. */
gst_amf_node_append_field_number (node, "capabilities",
15 /* libavformat's magic number */ );
/* "Indicates what audio codecs the client supports." */
gst_amf_node_append_field_number (node, "audioCodecs",
GST_RTMP_AUDIOCODECS);
/* "Indicates what video codecs are supported." */
gst_amf_node_append_field_number (node, "videoCodecs",
GST_RTMP_VIDEOCODECS);
/* "Indicates what special video functions are supported." */
gst_amf_node_append_field_number (node, "videoFunction",
GST_RTMP_VIDEOFUNCTION);
/* "URL of the web page from where the SWF file was loaded."
* XXX: libavformat sends "pageUrl" here, if provided. */
}
gst_rtmp_connection_send_command (data->connection, send_connect_done,
task, 0, "connect", node, NULL);
out:
gst_amf_node_free (node);
g_free (uri);
}
static void
send_connect_done (const gchar * command_name, GPtrArray * args,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
ConnectTaskData *data = g_task_get_task_data (task);
const GstAmfNode *node, *optional_args;
const gchar *code;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (!args) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"connect failed: %s", command_name);
g_object_unref (task);
return;
}
if (args->len < 2) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"connect failed; not enough return arguments");
g_object_unref (task);
return;
}
optional_args = g_ptr_array_index (args, 1);
node = gst_amf_node_get_field (optional_args, "code");
if (!node) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"result code missing from connect cmd result");
g_object_unref (task);
return;
}
code = gst_amf_node_peek_string (node, NULL);
GST_INFO ("connect result: %s", GST_STR_NULL (code));
if (g_str_equal (code, "NetConnection.Connect.Success")) {
node = gst_amf_node_get_field (optional_args, "secureToken");
send_secure_token_response (task, data->connection,
node ? gst_amf_node_peek_string (node, NULL) : NULL);
return;
}
if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
GstRtmpAuthmod authmod = data->location.authmod;
GMatchInfo *match_info;
const gchar *desc;
GstUri *query;
node = gst_amf_node_get_field (optional_args, "description");
if (!node) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"Connect rejected; no description");
g_object_unref (task);
return;
}
desc = gst_amf_node_peek_string (node, NULL);
GST_DEBUG ("connect result desc: %s", GST_STR_NULL (desc));
if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
if (strstr (desc, "authmod=adobe")) {
GST_INFO ("Reconnecting with authmod=adobe");
data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
socket_connect (task);
return;
}
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"unhandled authentication mode: %s", desc);
g_object_unref (task);
return;
}
if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"failed to parse auth rejection: %s", desc);
g_object_unref (task);
return;
}
{
gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
gchar *query_str = g_match_info_fetch_named (match_info, "query");
gboolean matches;
GST_INFO ("regex parsed auth: authmod=%s, query=%s",
GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
g_match_info_free (match_info);
switch (authmod) {
case GST_RTMP_AUTHMOD_ADOBE:
matches = g_str_equal (authmod_str, "adobe");
break;
default:
matches = FALSE;
break;
}
if (!matches) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"server uses wrong authentication mode '%s'; expected %s",
GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
g_object_unref (task);
g_free (authmod_str);
g_free (query_str);
return;
}
g_free (authmod_str);
query = gst_uri_from_string (query_str);
if (!query) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"failed to parse authentication query '%s'",
GST_STR_NULL (query_str));
g_object_unref (task);
g_free (query_str);
return;
}
g_free (query_str);
}
{
const gchar *reason = gst_uri_get_query_value (query, "reason");
if (g_str_equal (reason, "authfailed")) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"authentication failed! wrong credentials?");
g_object_unref (task);
gst_uri_unref (query);
return;
}
if (!g_str_equal (reason, "needauth")) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"unhandled rejection reason '%s'", reason ? reason : "");
g_object_unref (task);
gst_uri_unref (query);
return;
}
}
g_warn_if_fail (!data->auth_query);
data->auth_query = do_adobe_auth (data->location.username,
data->location.password, gst_uri_get_query_value (query, "salt"),
gst_uri_get_query_value (query, "opaque"),
gst_uri_get_query_value (query, "challenge"));
gst_uri_unref (query);
if (!data->auth_query) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"couldn't generate adobe style authentication query");
g_object_unref (task);
return;
}
socket_connect (task);
return;
}
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"unhandled connect result code: %s", GST_STR_NULL (code));
g_object_unref (task);
}
/* prep key: pack 1st 16 chars into 4 LittleEndian ints */
static void
rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
{
gchar copy[17];
g_return_if_fail (key);
g_return_if_fail (out);
/* ensure we can read 16 bytes */
strncpy (copy, key, 16);
/* placate GCC 8 -Wstringop-truncation */
copy[16] = 0;
out[0] = GST_READ_UINT32_LE (copy);
out[1] = GST_READ_UINT32_LE (copy + 4);
out[2] = GST_READ_UINT32_LE (copy + 8);
out[3] = GST_READ_UINT32_LE (copy + 12);
}
/* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
static GArray *
rtmp_tea_decode_prep_text (const gchar * text)
{
GArray *arr;
gsize len, i;
g_return_val_if_fail (text, NULL);
len = strlen (text);
arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
for (i = 0; i < len; i += 8) {
gchar copy[9];
guchar chars[4];
gsize j;
guint32 val;
/* ensure we can read 8 bytes */
strncpy (copy, text + i, 8);
/* placate GCC 8 -Wstringop-truncation */
copy[8] = 0;
for (j = 0; j < 4; j++) {
gint hi, lo;
hi = g_ascii_xdigit_value (copy[2 * j]);
lo = g_ascii_xdigit_value (copy[2 * j + 1]);
chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
}
val = GST_READ_UINT32_LE (chars);
g_array_append_val (arr, val);
}
return arr;
}
/* return text from uint32s to chars */
static gchar *
rtmp_tea_decode_return_text (GArray * arr)
{
#if G_BYTE_ORDER != G_LITTLE_ENDIAN
gsize i;
g_return_val_if_fail (arr, NULL);
for (i = 0; i < arr->len; i++) {
guint32 *val = &g_array_index (arr, guint32, i);
*val = GUINT32_TO_LE (*val);
}
#endif
/* array is alredy zero-terminated */
return g_array_free (arr, FALSE);
}
/* http://www.movable-type.co.uk/scripts/tea-block.html */
static void
rtmp_tea_decode_btea (GArray * text, guint32 key[4])
{
guint32 *v, n, *k;
guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
guint32 p, q;
g_return_if_fail (text);
g_return_if_fail (text->len > 0);
g_return_if_fail (key);
v = (guint32 *) text->data;
n = text->len;
k = key;
z = v[n - 1];
y = v[0];
q = 6 + 52 / n;
sum = q * DELTA;
#define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
while (sum != 0) {
e = sum >> 2 & 3;
for (p = n - 1; p > 0; p--)
z = v[p - 1], y = v[p] -= MX;
z = v[n - 1];
y = v[0] -= MX;
sum -= DELTA;
}
#undef MX
}
/* taken from librtmp */
static gchar *
rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
{
guint32 key[4];
GArray *text;
rtmp_tea_decode_prep_key (bin_key, key);
text = rtmp_tea_decode_prep_text (hex_text);
rtmp_tea_decode_btea (text, key);
return rtmp_tea_decode_return_text (text);
}
static void
send_secure_token_response (GTask * task, GstRtmpConnection * connection,
const gchar * challenge)
{
ConnectTaskData *data = g_task_get_task_data (task);
if (challenge) {
GstAmfNode *node1;
GstAmfNode *node2;
gchar *response;
if (!data->location.secure_token || !data->location.secure_token[0]) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"server requires secure token authentication");
g_object_unref (task);
return;
}
response = rtmp_tea_decode (data->location.secure_token, challenge);
GST_DEBUG ("response: %s", response);
node1 = gst_amf_node_new_null ();
node2 = gst_amf_node_new_take_string (response, -1);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"secureTokenResponse", node1, node2, NULL);
gst_amf_node_free (node1);
gst_amf_node_free (node2);
}
g_signal_handler_disconnect (connection, data->error_handler_id);
data->error_handler_id = 0;
g_task_return_pointer (task, g_object_ref (connection),
gst_rtmp_connection_close_and_unref);
g_object_unref (task);
}
GstRtmpConnection *
gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
{
GTask *task = G_TASK (result);
return g_task_propagate_pointer (task, error);
}
static void send_create_stream (GTask * task);
static void send_publish_or_play (GTask * task);
typedef struct
{
GstRtmpConnection *connection;
gulong error_handler_id;
gchar *stream;
gboolean publish;
guint32 id;
} StreamTaskData;
static StreamTaskData *
stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
gboolean publish)
{
StreamTaskData *data = g_slice_new0 (StreamTaskData);
data->connection = g_object_ref (connection);
data->stream = g_strdup (stream);
data->publish = publish;
return data;
}
static void
stream_task_data_free (gpointer ptr)
{
StreamTaskData *data = ptr;
g_clear_pointer (&data->stream, g_free);
if (data->error_handler_id) {
g_signal_handler_disconnect (data->connection, data->error_handler_id);
}
g_clear_object (&data->connection);
g_slice_free (StreamTaskData, data);
}
static void
start_stream (GstRtmpConnection * connection, const gchar * stream,
gboolean publish, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
GTask *task;
StreamTaskData *data;
init_debug ();
task = g_task_new (connection, cancellable, callback, user_data);
if (!stream) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Stream is not set");
g_object_unref (task);
return;
}
data = stream_task_data_new (connection, stream, publish);
g_task_set_task_data (task, data, stream_task_data_free);
data->error_handler_id = g_signal_connect (connection,
"error", G_CALLBACK (connection_error), task);
send_create_stream (task);
}
void
gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
const gchar * stream, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
start_stream (connection, stream, TRUE, cancellable, callback, user_data);
}
void
gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
const gchar * stream, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
start_stream (connection, stream, FALSE, cancellable, callback, user_data);
}
static void
send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
guint32 ms)
{
GstRtmpUserControl uc = {
.type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
.param = stream,
.param2 = ms,
};
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_user_control (&uc));
}
static void
send_create_stream (GTask * task)
{
GstRtmpConnection *connection = g_task_get_source_object (task);
StreamTaskData *data = g_task_get_task_data (task);
GstAmfNode *command_object, *stream_name;
command_object = gst_amf_node_new_null ();
stream_name = gst_amf_node_new_string (data->stream, -1);
if (data->publish) {
/* Not part of RTMP documentation */
GST_DEBUG ("Releasing stream '%s'", data->stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"releaseStream", command_object, stream_name, NULL);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"FCPublish", command_object, stream_name, NULL);
} else {
/* Matches librtmp */
gst_rtmp_connection_request_window_size (connection,
GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
send_set_buffer_length (connection, 0, 300);
}
GST_INFO ("Creating stream '%s'", data->stream);
gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
"createStream", command_object, NULL);
gst_amf_node_free (stream_name);
gst_amf_node_free (command_object);
}
static void
create_stream_done (const gchar * command_name, GPtrArray * args,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
StreamTaskData *data = g_task_get_task_data (task);
GstAmfNode *result;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (!args) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"createStream failed: %s", command_name);
g_object_unref (task);
return;
}
if (args->len < 2) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"createStream failed; not enough return arguments");
g_object_unref (task);
return;
}
result = g_ptr_array_index (args, 1);
if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
GString *error_dump = g_string_new ("");
gst_amf_node_dump (result, -1, error_dump);
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"createStream failed: %s", error_dump->str);
g_object_unref (task);
g_string_free (error_dump, TRUE);
return;
}
data->id = gst_amf_node_get_number (result);
GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
if (data->id == 0) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
"createStream returned ID 0");
g_object_unref (task);
return;
}
send_publish_or_play (task);
}
static void
send_publish_or_play (GTask * task)
{
GstRtmpConnection *connection = g_task_get_source_object (task);
StreamTaskData *data = g_task_get_task_data (task);
const gchar *command = data->publish ? "publish" : "play";
GstAmfNode *command_object, *stream_name, *argument;
command_object = gst_amf_node_new_null ();
stream_name = gst_amf_node_new_string (data->stream, -1);
if (data->publish) {
/* publishing type (live, record, append) */
argument = gst_amf_node_new_string ("live", -1);
} else {
/* "Start" argument: -2 = live or recording, -1 = only live
0 or positive = only recording, seek to X seconds */
argument = gst_amf_node_new_number (-2);
}
GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
command, data->stream, data->id);
gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
task, data->id, "onStatus");
gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
command, command_object, stream_name, argument, NULL);
if (!data->publish) {
/* Matches librtmp */
send_set_buffer_length (connection, data->id, 30000);
}
gst_amf_node_free (command_object);
gst_amf_node_free (stream_name);
gst_amf_node_free (argument);
}
static void
on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
GstRtmpConnection *connection = g_task_get_source_object (task);
StreamTaskData *data = g_task_get_task_data (task);
const gchar *command = data->publish ? "publish" : "play", *code = NULL;
GString *info_dump;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (!args) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"%s failed: %s", command, command_name);
g_object_unref (task);
return;
}
if (args->len < 2) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"%s failed; not enough return arguments", command);
g_object_unref (task);
return;
}
{
const GstAmfNode *info_object, *code_object;
info_object = g_ptr_array_index (args, 1);
code_object = gst_amf_node_get_field (info_object, "code");
if (code_object) {
code = gst_amf_node_peek_string (code_object, NULL);
}
info_dump = g_string_new ("");
gst_amf_node_dump (info_object, -1, info_dump);
}
if (data->publish) {
if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
GST_INFO ("publish success: %s", info_dump->str);
g_task_return_boolean (task, TRUE);
goto out;
}
if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
"publish denied: stream already exists: %s", info_dump->str);
goto out;
}
if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
g_task_return_new_error (task, G_IO_ERROR,
G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
goto out;
}
} else {
if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
GST_INFO ("play success: %s", info_dump->str);
g_task_return_boolean (task, TRUE);
goto out;
}
if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
"play denied: stream not found: %s", info_dump->str);
goto out;
}
}
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"unhandled %s result: %s", command, info_dump->str);
out:
g_string_free (info_dump, TRUE);
g_signal_handler_disconnect (connection, data->error_handler_id);
data->error_handler_id = 0;
g_object_unref (task);
}
static gboolean
start_stream_finish (GstRtmpConnection * connection,
GAsyncResult * result, guint32 * stream_id, GError ** error)
{
GTask *task;
StreamTaskData *data;
g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
task = G_TASK (result);
if (!g_task_propagate_boolean (G_TASK (result), error)) {
return FALSE;
}
data = g_task_get_task_data (task);
if (stream_id) {
*stream_id = data->id;
}
return TRUE;
}
gboolean
gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
GAsyncResult * result, guint32 * stream_id, GError ** error)
{
return start_stream_finish (connection, result, stream_id, error);
}
gboolean
gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
GAsyncResult * result, guint32 * stream_id, GError ** error)
{
return start_stream_finish (connection, result, stream_id, error);
}
void
gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
const gchar * stream, const GstRtmpStopCommands stop_commands)
{
send_stop (connection, stream, stop_commands);
}
static void
send_stop (GstRtmpConnection * connection, const gchar * stream,
const GstRtmpStopCommands stop_commands)
{
GstAmfNode *command_object, *stream_name;
command_object = gst_amf_node_new_null ();
stream_name = gst_amf_node_new_string (stream, -1);
if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"FCUnpublish", command_object, stream_name, NULL);
}
if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"closeStream", command_object, stream_name, NULL);
}
if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"deleteStream", command_object, stream_name, NULL);
}
gst_amf_node_free (stream_name);
gst_amf_node_free (command_object);
}