gstreamer/subprojects/gst-plugins-bad/gst/rtmp2/rtmp/rtmpclient.c
Jordan Petridis b6c577c70c rtmp2: reimplement librtmp's connection parameters for the connect packet
librtmp allows for attaching arbitrary AMF objects to the end of the
connect packet, and this is commonly used for authenticating with
servers.

Add a new property, extra-connect-args, that mimics librtmp's behavior.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7054>
2024-08-13 21:50:17 +00:00

1605 lines
46 KiB
C

/* GStreamer RTMP Library
* Copyright (C) 2013 David Schleef <ds@schleef.org>
* Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
* Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
* Boston, MA 02110-1335, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <gio/gio.h>
#include <string.h>
#include "rtmpclient.h"
#include "rtmphandshake.h"
#include "rtmpmessage.h"
#include "rtmputils.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_client_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_client_debug_category
static void send_connect_done (const gchar * command_name, GPtrArray * args,
gpointer user_data);
static void create_stream_done (const gchar * command_name, GPtrArray * args,
gpointer user_data);
static void on_publish_or_play_status (const gchar * command_name,
GPtrArray * args, gpointer user_data);
GQuark
gst_rtmp_conn_parsing_error_quark (void)
{
static GQuark quark = 0;
if (!quark)
quark = g_quark_from_static_string ("gst-rtmp-conn-parsing-error-quark");
return quark;
}
static void
init_debug (void)
{
static gsize done = 0;
if (g_once_init_enter (&done)) {
GST_DEBUG_CATEGORY_INIT (gst_rtmp_client_debug_category,
"rtmpclient", 0, "debug category for the rtmp client");
GST_DEBUG_REGISTER_FUNCPTR (send_connect_done);
GST_DEBUG_REGISTER_FUNCPTR (create_stream_done);
GST_DEBUG_REGISTER_FUNCPTR (on_publish_or_play_status);
g_once_init_leave (&done, 1);
}
}
static const gchar *scheme_strings[] = {
"rtmp",
"rtmps",
NULL
};
#define NUM_SCHEMES (G_N_ELEMENTS (scheme_strings) - 1)
GType
gst_rtmp_scheme_get_type (void)
{
static gsize scheme_type = 0;
static const GEnumValue scheme[] = {
{GST_RTMP_SCHEME_RTMP, "GST_RTMP_SCHEME_RTMP", "rtmp"},
{GST_RTMP_SCHEME_RTMPS, "GST_RTMP_SCHEME_RTMPS", "rtmps"},
{0, NULL, NULL},
};
if (g_once_init_enter (&scheme_type)) {
GType tmp = g_enum_register_static ("GstRtmpScheme", scheme);
g_once_init_leave (&scheme_type, tmp);
}
return (GType) scheme_type;
}
GstRtmpScheme
gst_rtmp_scheme_from_string (const gchar * string)
{
if (string) {
gint value;
for (value = 0; value < NUM_SCHEMES; value++) {
if (strcmp (scheme_strings[value], string) == 0) {
return value;
}
}
}
return -1;
}
GstRtmpScheme
gst_rtmp_scheme_from_uri (const GstUri * uri)
{
const gchar *scheme = gst_uri_get_scheme (uri);
if (!scheme) {
return GST_RTMP_SCHEME_RTMP;
}
return gst_rtmp_scheme_from_string (scheme);
}
const gchar *
gst_rtmp_scheme_to_string (GstRtmpScheme scheme)
{
if (scheme >= 0 && scheme < NUM_SCHEMES) {
return scheme_strings[scheme];
}
return "invalid";
}
const gchar *const *
gst_rtmp_scheme_get_strings (void)
{
return scheme_strings;
}
guint
gst_rtmp_scheme_get_default_port (GstRtmpScheme scheme)
{
switch (scheme) {
case GST_RTMP_SCHEME_RTMP:
return 1935;
case GST_RTMP_SCHEME_RTMPS:
return 443;
default:
g_return_val_if_reached (0);
}
}
GType
gst_rtmp_authmod_get_type (void)
{
static gsize authmod_type = 0;
static const GEnumValue authmod[] = {
{GST_RTMP_AUTHMOD_NONE, "GST_RTMP_AUTHMOD_NONE", "none"},
{GST_RTMP_AUTHMOD_AUTO, "GST_RTMP_AUTHMOD_AUTO", "auto"},
{GST_RTMP_AUTHMOD_ADOBE, "GST_RTMP_AUTHMOD_ADOBE", "adobe"},
{0, NULL, NULL},
};
if (g_once_init_enter (&authmod_type)) {
GType tmp = g_enum_register_static ("GstRtmpAuthmod", authmod);
g_once_init_leave (&authmod_type, tmp);
}
return (GType) authmod_type;
}
static const gchar *
gst_rtmp_authmod_get_nick (GstRtmpAuthmod value)
{
GEnumClass *klass = g_type_class_peek (GST_TYPE_RTMP_AUTHMOD);
GEnumValue *ev = klass ? g_enum_get_value (klass, value) : NULL;
return ev ? ev->value_nick : "(unknown)";
}
GType
gst_rtmp_stop_commands_get_type (void)
{
static gsize stop_commands_type = 0;
static const GFlagsValue stop_commands[] = {
{GST_RTMP_STOP_COMMANDS_NONE, "No command", "none"},
{GST_RTMP_STOP_COMMANDS_FCUNPUBLISH, "FCUnpublish", "fcunpublish"},
{GST_RTMP_STOP_COMMANDS_CLOSE_STREAM, "closeStream", "closestream"},
{GST_RTMP_STOP_COMMANDS_DELETE_STREAM, "deleteStream", "deletestream"},
{0, NULL, NULL},
};
if (g_once_init_enter (&stop_commands_type)) {
GType tmp = g_flags_register_static ("GstRtmpStopCommands", stop_commands);
g_once_init_leave (&stop_commands_type, tmp);
}
return (GType) stop_commands_type;
}
void
gst_rtmp_location_copy (GstRtmpLocation * dest, const GstRtmpLocation * src)
{
g_return_if_fail (dest);
g_return_if_fail (src);
dest->scheme = src->scheme;
dest->host = g_strdup (src->host);
dest->port = src->port;
dest->application = g_strdup (src->application);
dest->stream = g_strdup (src->stream);
dest->username = g_strdup (src->username);
dest->password = g_strdup (src->password);
dest->secure_token = g_strdup (src->secure_token);
dest->extra_connect_args = g_strdup (src->extra_connect_args);
dest->authmod = src->authmod;
dest->timeout = src->timeout;
dest->tls_flags = src->tls_flags;
dest->flash_ver = g_strdup (src->flash_ver);
dest->publish = src->publish;
}
void
gst_rtmp_location_clear (GstRtmpLocation * location)
{
g_return_if_fail (location);
g_clear_pointer (&location->host, g_free);
location->port = 0;
g_clear_pointer (&location->application, g_free);
g_clear_pointer (&location->stream, g_free);
g_clear_pointer (&location->username, g_free);
g_clear_pointer (&location->password, g_free);
g_clear_pointer (&location->secure_token, g_free);
g_clear_pointer (&location->extra_connect_args, g_free);
g_clear_pointer (&location->flash_ver, g_free);
location->publish = FALSE;
}
gchar *
gst_rtmp_location_get_string (const GstRtmpLocation * location,
gboolean with_stream)
{
GstUri *uri;
gchar *base, *string;
const gchar *scheme_string;
guint default_port;
g_return_val_if_fail (location, NULL);
scheme_string = gst_rtmp_scheme_to_string (location->scheme);
default_port = gst_rtmp_scheme_get_default_port (location->scheme);
uri = gst_uri_new (scheme_string, NULL, location->host,
location->port == default_port ? GST_URI_NO_PORT : location->port, "/",
NULL, NULL);
base = gst_uri_to_string (uri);
string = g_strconcat (base, location->application, with_stream ? "/" : NULL,
location->stream, NULL);
g_free (base);
gst_uri_unref (uri);
return string;
}
/* Flag values for the audioCodecs property,
* rtmp_specification_1.0.pdf page 32 */
enum
{
SUPPORT_SND_NONE = 0x001, /* Raw sound, no compression */
SUPPORT_SND_ADPCM = 0x002, /* ADPCM compression */
SUPPORT_SND_MP3 = 0x004, /* mp3 compression */
SUPPORT_SND_INTEL = 0x008, /* Not used */
SUPPORT_SND_UNUSED = 0x010, /* Not used */
SUPPORT_SND_NELLY8 = 0x020, /* NellyMoser at 8-kHz compression */
SUPPORT_SND_NELLY = 0x040, /* NellyMoser compression
* (5, 11, 22, and 44 kHz) */
SUPPORT_SND_G711A = 0x080, /* G711A sound compression
* (Flash Media Server only) */
SUPPORT_SND_G711U = 0x100, /* G711U sound compression
* (Flash Media Server only) */
SUPPORT_SND_NELLY16 = 0x200, /* NellyMoser at 16-kHz compression */
SUPPORT_SND_AAC = 0x400, /* Advanced audio coding (AAC) codec */
SUPPORT_SND_SPEEX = 0x800, /* Speex Audio */
SUPPORT_SND_ALL = 0xFFF, /* All RTMP-supported audio codecs */
};
/* audioCodecs value sent by libavformat. All "used" codecs. */
#define GST_RTMP_AUDIOCODECS \
(SUPPORT_SND_ALL & ~SUPPORT_SND_INTEL & ~SUPPORT_SND_UNUSED)
G_STATIC_ASSERT (GST_RTMP_AUDIOCODECS == 4071); /* libavformat's magic number */
/* Flag values for the videoCodecs property,
* rtmp_specification_1.0.pdf page 32 */
enum
{
SUPPORT_VID_UNUSED = 0x01, /* Obsolete value */
SUPPORT_VID_JPEG = 0x02, /* Obsolete value */
SUPPORT_VID_SORENSON = 0x04, /* Sorenson Flash video */
SUPPORT_VID_HOMEBREW = 0x08, /* V1 screen sharing */
SUPPORT_VID_VP6 = 0x10, /* On2 video (Flash 8+) */
SUPPORT_VID_VP6ALPHA = 0x20, /* On2 video with alpha channel */
SUPPORT_VID_HOMEBREWV = 0x40, /* Screen sharing version 2 (Flash 8+) */
SUPPORT_VID_H264 = 0x80, /* H264 video */
SUPPORT_VID_ALL = 0xFF, /* All RTMP-supported video codecs */
};
/* videoCodecs value sent by libavformat. All non-obsolete codecs. */
#define GST_RTMP_VIDEOCODECS \
(SUPPORT_VID_ALL & ~SUPPORT_VID_UNUSED & ~SUPPORT_VID_JPEG)
G_STATIC_ASSERT (GST_RTMP_VIDEOCODECS == 252); /* libavformat's magic number */
/* Flag values for the videoFunction property,
* rtmp_specification_1.0.pdf page 32 */
enum
{
/* Indicates that the client can perform frame-accurate seeks. */
SUPPORT_VID_CLIENT_SEEK = 1,
};
/* videoFunction value sent by libavformat */
#define GST_RTMP_VIDEOFUNCTION (SUPPORT_VID_CLIENT_SEEK)
G_STATIC_ASSERT (GST_RTMP_VIDEOFUNCTION == 1); /* libavformat's magic number */
static void socket_connect (GTask * task);
static void socket_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void handshake_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void send_connect (GTask * task);
static void send_stop (GstRtmpConnection * connection, const gchar * stream,
const GstRtmpStopCommands stop_commands);
static void send_secure_token_response (GTask * task,
GstRtmpConnection * connection, const gchar * challenge);
static void connection_error (GstRtmpConnection * connection,
const GError * error, gpointer user_data);
#define DEFAULT_TIMEOUT 5
typedef struct
{
GstRtmpLocation location;
gchar *auth_query;
GstRtmpConnection *connection;
gulong error_handler_id;
} ConnectTaskData;
static ConnectTaskData *
connect_task_data_new (const GstRtmpLocation * location)
{
ConnectTaskData *data = g_new0 (ConnectTaskData, 1);
gst_rtmp_location_copy (&data->location, location);
return data;
}
static void
connect_task_data_free (gpointer ptr)
{
ConnectTaskData *data = ptr;
gst_rtmp_location_clear (&data->location);
g_clear_pointer (&data->auth_query, g_free);
if (data->error_handler_id) {
g_signal_handler_disconnect (data->connection, data->error_handler_id);
}
g_clear_object (&data->connection);
g_free (data);
}
static GRegex *auth_regex = NULL;
void
gst_rtmp_client_connect_async (const GstRtmpLocation * location,
GCancellable * cancellable, GAsyncReadyCallback callback,
gpointer user_data)
{
GTask *task;
init_debug ();
if (g_once_init_enter (&auth_regex)) {
GRegex *re = g_regex_new ("\\[ *AccessManager.Reject *\\] *: *"
"\\[ *authmod=(?<authmod>.*?) *\\] *: *"
"(?<query>\\?.*)\\Z", G_REGEX_DOTALL, 0, NULL);
g_once_init_leave (&auth_regex, re);
}
task = g_task_new (NULL, cancellable, callback, user_data);
g_task_set_task_data (task, connect_task_data_new (location),
connect_task_data_free);
socket_connect (task);
}
static void
socket_connect (GTask * task)
{
ConnectTaskData *data = g_task_get_task_data (task);
GSocketConnectable *addr;
GSocketClient *socket_client;
if (data->location.timeout < 0) {
data->location.timeout = DEFAULT_TIMEOUT;
}
if (data->error_handler_id) {
g_signal_handler_disconnect (data->connection, data->error_handler_id);
data->error_handler_id = 0;
}
if (data->connection) {
gst_rtmp_connection_close (data->connection);
g_clear_object (&data->connection);
}
if (!data->location.host) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Host is not set");
g_object_unref (task);
return;
}
if (!data->location.port) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Port is not set");
g_object_unref (task);
return;
}
socket_client = g_socket_client_new ();
g_socket_client_set_timeout (socket_client, data->location.timeout);
switch (data->location.scheme) {
case GST_RTMP_SCHEME_RTMP:
break;
case GST_RTMP_SCHEME_RTMPS:
GST_DEBUG ("Configuring TLS, validation flags 0x%02x",
data->location.tls_flags);
g_socket_client_set_tls (socket_client, TRUE);
G_GNUC_BEGIN_IGNORE_DEPRECATIONS;
g_socket_client_set_tls_validation_flags (socket_client,
data->location.tls_flags);
G_GNUC_END_IGNORE_DEPRECATIONS;
break;
default:
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
"Invalid scheme ID %d", data->location.scheme);
g_object_unref (socket_client);
g_object_unref (task);
return;
}
addr = g_network_address_new (data->location.host, data->location.port);
GST_DEBUG ("Starting socket connection");
g_socket_client_connect_async (socket_client, addr,
g_task_get_cancellable (task), socket_connect_done, task);
g_object_unref (addr);
g_object_unref (socket_client);
}
static void
socket_connect_done (GObject * source, GAsyncResult * result,
gpointer user_data)
{
GSocketClient *socket_client = G_SOCKET_CLIENT (source);
GSocketConnection *socket_connection;
GTask *task = user_data;
GError *error = NULL;
socket_connection =
g_socket_client_connect_finish (socket_client, result, &error);
if (g_task_return_error_if_cancelled (task)) {
GST_DEBUG ("Socket connection was cancelled");
g_object_unref (task);
return;
}
if (socket_connection == NULL) {
GST_ERROR ("Socket connection error");
g_task_return_error (task, error);
g_object_unref (task);
return;
}
GST_DEBUG ("Socket connection established");
gst_rtmp_client_handshake (G_IO_STREAM (socket_connection), FALSE,
g_task_get_cancellable (task), handshake_done, task);
g_object_unref (socket_connection);
}
static void
handshake_done (GObject * source, GAsyncResult * result, gpointer user_data)
{
GIOStream *stream = G_IO_STREAM (source);
GSocketConnection *socket_connection = G_SOCKET_CONNECTION (stream);
GTask *task = user_data;
ConnectTaskData *data = g_task_get_task_data (task);
GError *error = NULL;
gboolean res;
res = gst_rtmp_client_handshake_finish (stream, result, &error);
if (!res) {
g_io_stream_close_async (stream, G_PRIORITY_DEFAULT, NULL, NULL, NULL);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
data->connection = gst_rtmp_connection_new (socket_connection,
g_task_get_cancellable (task));
data->error_handler_id = g_signal_connect (data->connection,
"error", G_CALLBACK (connection_error), task);
send_connect (task);
}
static void
connection_error (GstRtmpConnection * connection, const GError * error,
gpointer user_data)
{
GTask *task = user_data;
if (!g_task_had_error (task))
g_task_return_error (task, g_error_copy (error));
}
static gchar *
do_adobe_auth (const gchar * username, const gchar * password,
const gchar * salt, const gchar * opaque, const gchar * challenge)
{
guint8 hash[16]; /* MD5 digest */
gsize hashlen = sizeof hash;
gchar *challenge2, *auth_query;
GChecksum *md5;
g_return_val_if_fail (username, NULL);
g_return_val_if_fail (password, NULL);
g_return_val_if_fail (salt, NULL);
md5 = g_checksum_new (G_CHECKSUM_MD5);
g_checksum_update (md5, (guchar *) username, -1);
g_checksum_update (md5, (guchar *) salt, -1);
g_checksum_update (md5, (guchar *) password, -1);
g_checksum_get_digest (md5, hash, &hashlen);
g_warn_if_fail (hashlen == sizeof hash);
{
gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
g_checksum_reset (md5);
g_checksum_update (md5, (guchar *) hashstr, -1);
g_free (hashstr);
}
if (opaque)
g_checksum_update (md5, (guchar *) opaque, -1);
else if (challenge)
g_checksum_update (md5, (guchar *) challenge, -1);
challenge2 = g_strdup_printf ("%08x", g_random_int ());
g_checksum_update (md5, (guchar *) challenge2, -1);
g_checksum_get_digest (md5, hash, &hashlen);
g_warn_if_fail (hashlen == sizeof hash);
{
gchar *hashstr = g_base64_encode ((guchar *) hash, sizeof hash);
if (opaque) {
auth_query =
g_strdup_printf
("authmod=%s&user=%s&challenge=%s&response=%s&opaque=%s", "adobe",
username, challenge2, hashstr, opaque);
} else {
auth_query =
g_strdup_printf ("authmod=%s&user=%s&challenge=%s&response=%s",
"adobe", username, challenge2, hashstr);
}
g_free (hashstr);
}
g_checksum_free (md5);
g_free (challenge2);
return auth_query;
}
static GstAmfNode *
parse_conn_token (gchar type, const gchar * value, GError ** error)
{
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
// Function called without a type
g_assert (type);
gchar *end_ptr;
gboolean bool_;
GST_TRACE ("Parsing Connection token of Type: %c and Value: %s", type, value);
switch (type) {
case 'N':
// Empty value
if (value[0] == '\0') {
g_set_error (error,
GST_RTMP_CONN_PARSING_ERROR,
GST_RTMP_CONN_PARSING_ERROR_INVALID_VALUE,
"Found Numeric type, but the value is an empty string");
return NULL;
}
gdouble num = g_ascii_strtod (value, &end_ptr);
if (end_ptr[0] != '\0') {
g_set_error (error,
GST_RTMP_CONN_PARSING_ERROR,
GST_RTMP_CONN_PARSING_ERROR_FAILED_PARSING_DOUBLE,
"Failed to convert %s to double", value);
return NULL;
}
return gst_amf_node_new_number (num);
case 'S':
return gst_amf_node_new_string (value, -1);
case 'B':
// We are mimicking the behavior of librtmp here, which
// is using atoi and thus every invalid string is false
// https://salsa.debian.org/multimedia-team/rtmpdump/-/blob/a56abc82a99e8c4497a421d9dbc06e4544ade200/librtmp/rtmp.c#L632-634
bool_ = g_ascii_strtoull (value, &end_ptr, 10);
if (end_ptr[0] != '\0') {
return gst_amf_node_new_boolean (FALSE);
}
return gst_amf_node_new_boolean (bool_);
case 'Z':
return gst_amf_node_new_null ();
case 'O':
// Unimplemented for now
// Error: Unsupported
// O:1 Starts the object, then we parse the other conn= until O:0
// Then finish the object and serialize it
g_set_error (error,
GST_RTMP_CONN_PARSING_ERROR,
GST_RTMP_CONN_PARSING_ERROR_UNSUPPORTED,
"Objects are not yet supported");
return NULL;
default:
g_set_error (error,
GST_RTMP_CONN_PARSING_ERROR,
GST_RTMP_CONN_PARSING_ERROR_INVALID_TYPE,
"Invalid data type passed: %c", type);
return NULL;
}
}
// LIBRTMP(3) can append arbitrary data to the connection packet of RTMP.
// It does so using a "connection" parameter appended after the url.
// For a description of the format, see LIBRTMP(3) Connection Parameters
//
// Here we parse the conn= options and replicate the behavior for librtmp
static gboolean
parse_librtmp_style_conn_props (const gchar * connect_string, GPtrArray * array,
GError ** error)
{
g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
gchar **params;
// Split the string "conn=S:Foo conn=B:Bar"
params = g_strsplit (connect_string, "conn=", -1);
for (gsize i = 0; params[i]; i++) {
const gchar *param = g_strstrip (params[i]);
// Continue on empty string
if (param[0] == '\0') {
continue;
}
// Check for Named field of an object
// Example token: 'NS:Foo:Bar'
// The [0] byte will always be 'N' and the [2] must be the colon, which
// only occurs on named fields
if (param[0] == 'N' && param[1] != ':' && param[1] != '\0'
&& param[2] == ':') {
// TODO: Error out if we had not found an object before
// TODO: split the value and create the AMF node
// then append it to the amf object, with the name
g_set_error (error,
GST_RTMP_CONN_PARSING_ERROR,
GST_RTMP_CONN_PARSING_ERROR_UNSUPPORTED,
"Objects are not yet supported");
g_strfreev (params);
return FALSE;
}
if (param[1] != ':') {
g_set_error (error,
GST_RTMP_CONN_PARSING_ERROR,
GST_RTMP_CONN_PARSING_ERROR_INVALID_VALUE,
"Parameter values are not separated by colon (:): %s",
connect_string);
g_strfreev (params);
return FALSE;
}
// Example token: 'S:Bar'
// [0] is the type prefix: 'S', 'B', etc
// [1] should always be a ':' separator
// [2] and is our arbitrary data value
const gchar type_ = param[0];
const gchar *value = &param[2];
GError *parse_error = NULL;
GstAmfNode *node = parse_conn_token (type_, value, &parse_error);
if (!node) {
g_strfreev (params);
g_propagate_error (error, parse_error);
return FALSE;
}
g_ptr_array_add (array, node);
};
g_strfreev (params);
return TRUE;
}
static void
send_connect (GTask * task)
{
ConnectTaskData *data = g_task_get_task_data (task);
GPtrArray *arguments = g_ptr_array_new_with_free_func (gst_amf_node_free);
GstAmfNode *node;
const gchar *app, *flash_ver;
gchar *uri, *appstr = NULL, *uristr = NULL;
gboolean publish;
node = gst_amf_node_new_object ();
app = data->location.application;
flash_ver = data->location.flash_ver;
publish = data->location.publish;
uri = gst_rtmp_location_get_string (&data->location, FALSE);
if (!app) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Application is not set");
g_object_unref (task);
goto out;
}
if (data->auth_query) {
const gchar *query = data->auth_query;
appstr = g_strdup_printf ("%s?%s", app, query);
uristr = g_strdup_printf ("%s?%s", uri, query);
} else if (data->location.authmod == GST_RTMP_AUTHMOD_ADOBE) {
const gchar *user = data->location.username;
const gchar *authmod = "adobe";
if (!user) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"no username for adobe authentication");
g_object_unref (task);
goto out;
}
if (!data->location.password) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"no password for adobe authentication");
g_object_unref (task);
goto out;
}
appstr = g_strdup_printf ("%s?authmod=%s&user=%s", app, authmod, user);
uristr = g_strdup_printf ("%s?authmod=%s&user=%s", uri, authmod, user);
} else {
appstr = g_strdup (app);
uristr = g_strdup (uri);
}
/* Arguments for the connect command.
* Most of these are described in rtmp_specification_1.0.pdf page 30 */
/* "The server application name the client is connected to." */
gst_amf_node_append_field_take_string (node, "app", appstr, -1);
if (publish) {
/* Undocumented. Sent by both libavformat and librtmp. */
gst_amf_node_append_field_string (node, "type", "nonprivate", -1);
}
if (flash_ver) {
/* "Flash Player version. It is the same string as returned by the
* ApplicationScript getversion () function." */
gst_amf_node_append_field_string (node, "flashVer", flash_ver, -1);
}
/* "URL of the source SWF file making the connection."
* XXX: libavformat sends "swfUrl" here, if provided. */
/* "URL of the Server. It has the following format.
* protocol://servername:port/appName/appInstance" */
gst_amf_node_append_field_take_string (node, "tcUrl", uristr, -1);
if (!publish) {
/* "True if proxy is being used." */
gst_amf_node_append_field_boolean (node, "fpad", FALSE);
/* Undocumented. Sent by libavformat. */
gst_amf_node_append_field_number (node, "capabilities",
15 /* libavformat's magic number */ );
/* "Indicates what audio codecs the client supports." */
gst_amf_node_append_field_number (node, "audioCodecs",
GST_RTMP_AUDIOCODECS);
/* "Indicates what video codecs are supported." */
gst_amf_node_append_field_number (node, "videoCodecs",
GST_RTMP_VIDEOCODECS);
/* "Indicates what special video functions are supported." */
gst_amf_node_append_field_number (node, "videoFunction",
GST_RTMP_VIDEOFUNCTION);
/* "URL of the web page from where the SWF file was loaded."
* XXX: libavformat sends "pageUrl" here, if provided. */
}
g_ptr_array_add (arguments, node);
/* Parse librtmp style connect parameters */
if (data->location.extra_connect_args
&& data->location.extra_connect_args[0] != '\0') {
GError *error = NULL;
gboolean conn_result =
parse_librtmp_style_conn_props (data->location.extra_connect_args,
arguments,
&error);
// Failed to parse the connect-args prop
if (!conn_result) {
g_task_return_new_error (task, error->domain, error->code,
"Failed to parse extra connection args: %s", error->message);
g_clear_error (&error);
goto out;
}
}
gst_rtmp_connection_send_command_with_args (data->connection,
send_connect_done, task, 0, "connect", arguments->len,
(const GstAmfNode **) arguments->pdata);
out:
g_ptr_array_free (arguments, TRUE);
g_free (uri);
}
static void
send_connect_done (const gchar * command_name, GPtrArray * args,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
ConnectTaskData *data = g_task_get_task_data (task);
const GstAmfNode *node, *optional_args;
const gchar *code;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (!args) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'connect' cmd failed: %s", command_name);
g_object_unref (task);
return;
}
if (args->len < 2) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'connect' cmd failed; not enough return arguments");
g_object_unref (task);
return;
}
optional_args = g_ptr_array_index (args, 1);
node = gst_amf_node_get_field (optional_args, "code");
code = node ? gst_amf_node_peek_string (node, NULL) : NULL;
if (!code) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'connect' cmd failed; no status code");
g_object_unref (task);
return;
}
GST_INFO ("connect result: %s", code);
if (g_str_equal (code, "NetConnection.Connect.Success")) {
node = gst_amf_node_get_field (optional_args, "secureToken");
send_secure_token_response (task, data->connection,
node ? gst_amf_node_peek_string (node, NULL) : NULL);
return;
}
if (g_str_equal (code, "NetConnection.Connect.Rejected")) {
GstRtmpAuthmod authmod = data->location.authmod;
GMatchInfo *match_info;
const gchar *desc;
GstUri *query;
node = gst_amf_node_get_field (optional_args, "description");
desc = node ? gst_amf_node_peek_string (node, NULL) : NULL;
if (!desc) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"'connect' cmd returned '%s'; no description", code);
g_object_unref (task);
return;
}
GST_DEBUG ("connect result desc: %s", desc);
if (authmod == GST_RTMP_AUTHMOD_AUTO && strstr (desc, "code=403 need auth")) {
if (strstr (desc, "authmod=adobe")) {
GST_INFO ("Reconnecting with authmod=adobe");
data->location.authmod = GST_RTMP_AUTHMOD_ADOBE;
socket_connect (task);
return;
}
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"'connect' cmd returned unhandled authmod: %s", desc);
g_object_unref (task);
return;
}
if (!g_regex_match (auth_regex, desc, 0, &match_info)) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"'connect' cmd returned '%s': %s", code, desc);
g_object_unref (task);
return;
}
{
gchar *authmod_str = g_match_info_fetch_named (match_info, "authmod");
gchar *query_str = g_match_info_fetch_named (match_info, "query");
gboolean matches;
GST_INFO ("regex parsed auth: authmod=%s, query=%s",
GST_STR_NULL (authmod_str), GST_STR_NULL (query_str));
g_match_info_free (match_info);
switch (authmod) {
case GST_RTMP_AUTHMOD_ADOBE:
matches = g_str_equal (authmod_str, "adobe");
break;
default:
matches = FALSE;
break;
}
if (!matches) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"server uses wrong authentication mode '%s'; expected %s",
GST_STR_NULL (authmod_str), gst_rtmp_authmod_get_nick (authmod));
g_object_unref (task);
g_free (authmod_str);
g_free (query_str);
return;
}
g_free (authmod_str);
query = gst_uri_from_string (query_str);
if (!query) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"failed to parse authentication query '%s'",
GST_STR_NULL (query_str));
g_object_unref (task);
g_free (query_str);
return;
}
g_free (query_str);
}
{
const gchar *reason = gst_uri_get_query_value (query, "reason");
if (!reason) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"authentication failed; no reason: %s", desc);
g_object_unref (task);
gst_uri_unref (query);
return;
}
if (g_str_equal (reason, "authfailed")) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"authentication failed; wrong credentials?: %s", desc);
g_object_unref (task);
gst_uri_unref (query);
return;
}
if (!g_str_equal (reason, "needauth")) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"authentication failed; reason '%s': %s", reason, desc);
g_object_unref (task);
gst_uri_unref (query);
return;
}
}
{
const gchar *salt, *opaque, *challenge;
salt = gst_uri_get_query_value (query, "salt");
if (!salt) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"salt missing from auth request: %s", desc);
g_object_unref (task);
gst_uri_unref (query);
return;
}
opaque = gst_uri_get_query_value (query, "opaque");
challenge = gst_uri_get_query_value (query, "challenge");
g_warn_if_fail (!data->auth_query);
data->auth_query = do_adobe_auth (data->location.username,
data->location.password, salt, opaque, challenge);
}
gst_uri_unref (query);
if (!data->auth_query) {
/* do_adobe_auth should not fail; send_connect tests if username
* and password are provided */
g_warn_if_reached ();
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"internal error: failed to generate adobe auth query");
g_object_unref (task);
return;
}
socket_connect (task);
return;
}
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'connect' cmd returned '%s'", code);
g_object_unref (task);
}
/* prep key: pack 1st 16 chars into 4 LittleEndian ints */
static void
rtmp_tea_decode_prep_key (const gchar * key, guint32 out[4])
{
gchar copy[17];
g_return_if_fail (key);
g_return_if_fail (out);
/* ensure we can read 16 bytes */
strncpy (copy, key, 16);
/* placate GCC 8 -Wstringop-truncation */
copy[16] = 0;
out[0] = GST_READ_UINT32_LE (copy);
out[1] = GST_READ_UINT32_LE (copy + 4);
out[2] = GST_READ_UINT32_LE (copy + 8);
out[3] = GST_READ_UINT32_LE (copy + 12);
}
/* prep text: hex2bin, each 8 digits -> 4 chars -> 1 uint32 */
static GArray *
rtmp_tea_decode_prep_text (const gchar * text)
{
GArray *arr;
gsize len, i;
g_return_val_if_fail (text, NULL);
len = strlen (text);
arr = g_array_sized_new (TRUE, TRUE, 4, (len + 7) / 8);
for (i = 0; i < len; i += 8) {
gchar copy[9];
guchar chars[4];
gsize j;
guint32 val;
/* ensure we can read 8 bytes */
strncpy (copy, text + i, 8);
/* placate GCC 8 -Wstringop-truncation */
copy[8] = 0;
for (j = 0; j < 4; j++) {
gint hi, lo;
hi = g_ascii_xdigit_value (copy[2 * j]);
lo = g_ascii_xdigit_value (copy[2 * j + 1]);
chars[j] = (hi > 0 ? hi << 4 : 0) + (lo > 0 ? lo : 0);
}
val = GST_READ_UINT32_LE (chars);
g_array_append_val (arr, val);
}
return arr;
}
/* return text from uint32s to chars */
static gchar *
rtmp_tea_decode_return_text (GArray * arr)
{
#if G_BYTE_ORDER != G_LITTLE_ENDIAN
gsize i;
g_return_val_if_fail (arr, NULL);
for (i = 0; i < arr->len; i++) {
guint32 *val = &g_array_index (arr, guint32, i);
*val = GUINT32_TO_LE (*val);
}
#endif
/* array is alredy zero-terminated */
return g_array_free (arr, FALSE);
}
/* http://www.movable-type.co.uk/scripts/tea-block.html */
static void
rtmp_tea_decode_btea (GArray * text, guint32 key[4])
{
guint32 *v, n, *k;
guint32 z, y, sum = 0, e, DELTA = 0x9e3779b9;
guint32 p, q;
g_return_if_fail (text);
g_return_if_fail (text->len > 0);
g_return_if_fail (key);
v = (guint32 *) text->data;
n = text->len;
k = key;
z = v[n - 1];
y = v[0];
q = 6 + 52 / n;
sum = q * DELTA;
#define MX ((z>>5^y<<2) + (y>>3^z<<4)) ^ ((sum^y) + (k[(p&3)^e]^z));
while (sum != 0) {
e = sum >> 2 & 3;
for (p = n - 1; p > 0; p--)
z = v[p - 1], y = v[p] -= MX;
z = v[n - 1];
y = v[0] -= MX;
sum -= DELTA;
}
#undef MX
}
/* taken from librtmp */
static gchar *
rtmp_tea_decode (const gchar * bin_key, const gchar * hex_text)
{
guint32 key[4];
GArray *text;
rtmp_tea_decode_prep_key (bin_key, key);
text = rtmp_tea_decode_prep_text (hex_text);
rtmp_tea_decode_btea (text, key);
return rtmp_tea_decode_return_text (text);
}
static void
send_secure_token_response (GTask * task, GstRtmpConnection * connection,
const gchar * challenge)
{
ConnectTaskData *data = g_task_get_task_data (task);
if (challenge) {
GstAmfNode *node1;
GstAmfNode *node2;
gchar *response;
if (!data->location.secure_token || !data->location.secure_token[0]) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED,
"server requires secureToken but no token provided");
g_object_unref (task);
return;
}
response = rtmp_tea_decode (data->location.secure_token, challenge);
GST_DEBUG ("response: %s", response);
node1 = gst_amf_node_new_null ();
node2 = gst_amf_node_new_take_string (response, -1);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"secureTokenResponse", node1, node2, NULL);
gst_amf_node_free (node1);
gst_amf_node_free (node2);
}
g_signal_handler_disconnect (connection, data->error_handler_id);
data->error_handler_id = 0;
g_task_return_pointer (task, g_object_ref (connection),
gst_rtmp_connection_close_and_unref);
g_object_unref (task);
}
GstRtmpConnection *
gst_rtmp_client_connect_finish (GAsyncResult * result, GError ** error)
{
GTask *task = G_TASK (result);
return g_task_propagate_pointer (task, error);
}
static void send_create_stream (GTask * task);
static void send_publish_or_play (GTask * task);
typedef struct
{
GstRtmpConnection *connection;
gulong error_handler_id;
gchar *stream;
gboolean publish;
guint32 id;
} StreamTaskData;
static StreamTaskData *
stream_task_data_new (GstRtmpConnection * connection, const gchar * stream,
gboolean publish)
{
StreamTaskData *data = g_new0 (StreamTaskData, 1);
data->connection = g_object_ref (connection);
data->stream = g_strdup (stream);
data->publish = publish;
return data;
}
static void
stream_task_data_free (gpointer ptr)
{
StreamTaskData *data = ptr;
g_clear_pointer (&data->stream, g_free);
if (data->error_handler_id) {
g_signal_handler_disconnect (data->connection, data->error_handler_id);
}
g_clear_object (&data->connection);
g_free (data);
}
static void
start_stream (GstRtmpConnection * connection, const gchar * stream,
gboolean publish, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
GTask *task;
StreamTaskData *data;
init_debug ();
task = g_task_new (connection, cancellable, callback, user_data);
if (!stream) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
"Stream is not set");
g_object_unref (task);
return;
}
data = stream_task_data_new (connection, stream, publish);
g_task_set_task_data (task, data, stream_task_data_free);
data->error_handler_id = g_signal_connect (connection,
"error", G_CALLBACK (connection_error), task);
send_create_stream (task);
}
void
gst_rtmp_client_start_publish_async (GstRtmpConnection * connection,
const gchar * stream, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
start_stream (connection, stream, TRUE, cancellable, callback, user_data);
}
void
gst_rtmp_client_start_play_async (GstRtmpConnection * connection,
const gchar * stream, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
start_stream (connection, stream, FALSE, cancellable, callback, user_data);
}
static void
send_set_buffer_length (GstRtmpConnection * connection, guint32 stream,
guint32 ms)
{
GstRtmpUserControl uc = {
.type = GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH,
.param = stream,
.param2 = ms,
};
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_user_control (&uc));
}
static void
send_create_stream (GTask * task)
{
GstRtmpConnection *connection = g_task_get_source_object (task);
StreamTaskData *data = g_task_get_task_data (task);
GstAmfNode *command_object, *stream_name;
command_object = gst_amf_node_new_null ();
stream_name = gst_amf_node_new_string (data->stream, -1);
if (data->publish) {
/* Not part of RTMP documentation */
GST_DEBUG ("Releasing stream '%s'", data->stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"releaseStream", command_object, stream_name, NULL);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"FCPublish", command_object, stream_name, NULL);
} else {
/* Matches librtmp */
gst_rtmp_connection_request_window_size (connection,
GST_RTMP_DEFAULT_WINDOW_ACK_SIZE);
send_set_buffer_length (connection, 0, 300);
}
GST_INFO ("Creating stream '%s'", data->stream);
gst_rtmp_connection_send_command (connection, create_stream_done, task, 0,
"createStream", command_object, NULL);
gst_amf_node_free (stream_name);
gst_amf_node_free (command_object);
}
static void
create_stream_done (const gchar * command_name, GPtrArray * args,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
StreamTaskData *data = g_task_get_task_data (task);
GstAmfNode *result;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (!args) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'createStream' cmd failed: %s", command_name);
g_object_unref (task);
return;
}
if (args->len < 2) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'createStream' cmd failed; not enough return arguments");
g_object_unref (task);
return;
}
result = g_ptr_array_index (args, 1);
if (gst_amf_node_get_type (result) != GST_AMF_TYPE_NUMBER) {
GString *error_dump = g_string_new ("");
gst_amf_node_dump (result, -1, error_dump);
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'createStream' cmd failed: %s: %s", command_name, error_dump->str);
g_object_unref (task);
g_string_free (error_dump, TRUE);
return;
}
data->id = gst_amf_node_get_number (result);
GST_INFO ("createStream success, stream_id=%" G_GUINT32_FORMAT, data->id);
if (data->id == 0) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
"'createStream' cmd returned ID 0");
g_object_unref (task);
return;
}
send_publish_or_play (task);
}
static void
send_publish_or_play (GTask * task)
{
GstRtmpConnection *connection = g_task_get_source_object (task);
StreamTaskData *data = g_task_get_task_data (task);
const gchar *command = data->publish ? "publish" : "play";
GstAmfNode *command_object, *stream_name, *argument;
command_object = gst_amf_node_new_null ();
stream_name = gst_amf_node_new_string (data->stream, -1);
if (data->publish) {
/* publishing type (live, record, append) */
argument = gst_amf_node_new_string ("live", -1);
} else {
/* "Start" argument: -2 = live or recording, -1 = only live
0 or positive = only recording, seek to X seconds */
argument = gst_amf_node_new_number (-2);
}
GST_INFO ("Sending %s for '%s' on stream %" G_GUINT32_FORMAT,
command, data->stream, data->id);
gst_rtmp_connection_expect_command (connection, on_publish_or_play_status,
task, data->id, "onStatus");
gst_rtmp_connection_send_command (connection, NULL, NULL, data->id,
command, command_object, stream_name, argument, NULL);
if (!data->publish) {
/* Matches librtmp */
send_set_buffer_length (connection, data->id, 30000);
}
gst_amf_node_free (command_object);
gst_amf_node_free (stream_name);
gst_amf_node_free (argument);
}
static void
on_publish_or_play_status (const gchar * command_name, GPtrArray * args,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
GstRtmpConnection *connection = g_task_get_source_object (task);
StreamTaskData *data = g_task_get_task_data (task);
const gchar *command = data->publish ? "publish" : "play", *code = NULL;
GString *info_dump;
if (g_task_return_error_if_cancelled (task)) {
g_object_unref (task);
return;
}
if (!args) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'%s' cmd failed: %s", command, command_name);
g_object_unref (task);
return;
}
if (args->len < 2) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'%s' cmd failed; not enough return arguments", command);
g_object_unref (task);
return;
}
{
const GstAmfNode *info_object, *code_object;
info_object = g_ptr_array_index (args, 1);
code_object = gst_amf_node_get_field (info_object, "code");
if (code_object) {
code = gst_amf_node_peek_string (code_object, NULL);
}
info_dump = g_string_new ("");
gst_amf_node_dump (info_object, -1, info_dump);
}
if (data->publish) {
if (g_strcmp0 (code, "NetStream.Publish.Start") == 0) {
GST_INFO ("publish success: %s", info_dump->str);
g_task_return_boolean (task, TRUE);
goto out;
}
if (g_strcmp0 (code, "NetStream.Publish.BadName") == 0) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_EXISTS,
"publish denied; stream already exists: %s", info_dump->str);
goto out;
}
if (g_strcmp0 (code, "NetStream.Publish.Denied") == 0) {
g_task_return_new_error (task, G_IO_ERROR,
G_IO_ERROR_PERMISSION_DENIED, "publish denied: %s", info_dump->str);
goto out;
}
} else {
if (g_strcmp0 (code, "NetStream.Play.Start") == 0 ||
g_strcmp0 (code, "NetStream.Play.PublishNotify") == 0 ||
g_strcmp0 (code, "NetStream.Play.Reset") == 0) {
GST_INFO ("play success: %s", info_dump->str);
g_task_return_boolean (task, TRUE);
goto out;
}
if (g_strcmp0 (code, "NetStream.Play.StreamNotFound") == 0) {
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_NOT_FOUND,
"play denied; stream not found: %s", info_dump->str);
goto out;
}
}
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_FAILED,
"'%s' cmd failed: %s: %s", command, command_name, info_dump->str);
out:
g_string_free (info_dump, TRUE);
g_signal_handler_disconnect (connection, data->error_handler_id);
data->error_handler_id = 0;
g_object_unref (task);
}
static gboolean
start_stream_finish (GstRtmpConnection * connection,
GAsyncResult * result, guint32 * stream_id, GError ** error)
{
GTask *task;
StreamTaskData *data;
g_return_val_if_fail (g_task_is_valid (result, connection), FALSE);
task = G_TASK (result);
if (!g_task_propagate_boolean (G_TASK (result), error)) {
return FALSE;
}
data = g_task_get_task_data (task);
if (stream_id) {
*stream_id = data->id;
}
return TRUE;
}
gboolean
gst_rtmp_client_start_publish_finish (GstRtmpConnection * connection,
GAsyncResult * result, guint32 * stream_id, GError ** error)
{
return start_stream_finish (connection, result, stream_id, error);
}
gboolean
gst_rtmp_client_start_play_finish (GstRtmpConnection * connection,
GAsyncResult * result, guint32 * stream_id, GError ** error)
{
return start_stream_finish (connection, result, stream_id, error);
}
void
gst_rtmp_client_stop_publish (GstRtmpConnection * connection,
const gchar * stream, const GstRtmpStopCommands stop_commands)
{
send_stop (connection, stream, stop_commands);
}
static void
send_stop (GstRtmpConnection * connection, const gchar * stream,
const GstRtmpStopCommands stop_commands)
{
GstAmfNode *command_object, *stream_name;
command_object = gst_amf_node_new_null ();
stream_name = gst_amf_node_new_string (stream, -1);
if (stop_commands & GST_RTMP_STOP_COMMANDS_FCUNPUBLISH) {
GST_DEBUG ("Sending stop command 'FCUnpublish' for stream '%s'", stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"FCUnpublish", command_object, stream_name, NULL);
}
if (stop_commands & GST_RTMP_STOP_COMMANDS_CLOSE_STREAM) {
GST_DEBUG ("Sending stop command 'closeStream' for stream '%s'", stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"closeStream", command_object, stream_name, NULL);
}
if (stop_commands & GST_RTMP_STOP_COMMANDS_DELETE_STREAM) {
GST_DEBUG ("Sending stop command 'deleteStream' for stream '%s'", stream);
gst_rtmp_connection_send_command (connection, NULL, NULL, 0,
"deleteStream", command_object, stream_name, NULL);
}
gst_amf_node_free (stream_name);
gst_amf_node_free (command_object);
}