gstreamer/gst/rtmp2/rtmp/rtmpconnection.c
Matthew Waters 1f7515100c rtmp2/connection: pass the parent cancellable down to the connection
Otherwise, when rtpm2src cancels an inflight operation that has a queued
message stored, then the rtmp connection operation is not stopped.

If the cancellation occurs during rtmp connection start up, then
rtpm2src does not have any way of accessing the connection object as it
has not been returned yet.  As a result, rtpm2src will cancel, the
connection will still be processing things and the
GMainContext/GMainLoop associated with the outstanding operation will be
destroyed.  All outstanding operations and the rtmpconnection object will
therefore be leaked in this case.

Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1425
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1862>
2020-12-08 23:43:02 +00:00

1381 lines
40 KiB
C

/* GStreamer RTMP Library
* Copyright (C) 2013 David Schleef <ds@schleef.org>
* Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
* Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
* Boston, MA 02110-1335, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <string.h>
#include <math.h>
#include "rtmpconnection.h"
#include "rtmpchunkstream.h"
#include "rtmpmessage.h"
#include "rtmputils.h"
#include "amf.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtmp_connection_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_connection_debug_category
#define READ_SIZE 8192
typedef void (*GstRtmpConnectionCallback) (GstRtmpConnection * connection);
struct _GstRtmpConnection
{
GObject parent_instance;
/* should be properties */
gboolean input_paused;
gboolean error;
/* private */
GThread *thread;
GSocketConnection *connection;
GCancellable *cancellable;
GSocketClient *socket_client;
GAsyncQueue *output_queue;
GMainContext *main_context;
GSource *input_source;
GByteArray *input_bytes;
guint input_needed_bytes;
GstRtmpChunkStreams *input_streams, *output_streams;
GList *transactions;
GList *expected_commands;
guint transaction_count;
GstRtmpConnectionMessageFunc input_handler;
gpointer input_handler_user_data;
GDestroyNotify input_handler_user_data_destroy;
GstRtmpConnectionFunc output_handler;
gpointer output_handler_user_data;
GDestroyNotify output_handler_user_data_destroy;
gboolean writing;
/* Protects the values below during concurrent access.
* - Taken by the loop thread when writing, but not reading.
* - Taken by other threads when reading (calling get_stats).
*/
GMutex stats_lock;
/* RTMP configuration */
guint32 in_chunk_size;
guint32 out_chunk_size, out_chunk_size_pending;
guint32 in_window_ack_size;
guint32 out_window_ack_size, out_window_ack_size_pending;
guint64 in_bytes_total;
guint64 out_bytes_total;
guint64 in_bytes_acked;
guint64 out_bytes_acked;
};
typedef struct
{
GObjectClass parent_class;
} GstRtmpConnectionClass;
/* prototypes */
static void gst_rtmp_connection_dispose (GObject * object);
static void gst_rtmp_connection_finalize (GObject * object);
static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
gpointer user_data);
static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
static void gst_rtmp_connection_write_buffer_done (GObject * obj,
GAsyncResult * result, gpointer user_data);
static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
guint needed_bytes);
static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
connection, GstBuffer * buffer);
static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
connection, GstBuffer * buffer);
static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
GstBuffer * buffer);
static void gst_rtmp_connection_handle_user_control (GstRtmpConnection * sc,
GstBuffer * buffer);
static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
GstBuffer * buffer);
static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
guint32 in_chunk_size);
static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
guint32 bytes);
static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
self, guint32 in_chunk_size);
static void gst_rtmp_connection_send_ack (GstRtmpConnection * connection);
static void
gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
guint32 event_data);
static gboolean
gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
GstBuffer * buffer);
static void
gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self);
typedef struct
{
gdouble transaction_id;
GstRtmpCommandCallback func;
gpointer user_data;
} Transaction;
static Transaction *
transaction_new (gdouble transaction_id, GstRtmpCommandCallback func,
gpointer user_data)
{
Transaction *data = g_slice_new (Transaction);
data->transaction_id = transaction_id;
data->func = func;
data->user_data = user_data;
return data;
}
static void
transaction_free (gpointer ptr)
{
Transaction *data = ptr;
g_slice_free (Transaction, data);
}
typedef struct
{
guint32 stream_id;
gchar *command_name;
GstRtmpCommandCallback func;
gpointer user_data;
} ExpectedCommand;
static ExpectedCommand *
expected_command_new (guint32 stream_id, const gchar * command_name,
GstRtmpCommandCallback func, gpointer user_data)
{
ExpectedCommand *data = g_slice_new (ExpectedCommand);
data->stream_id = stream_id;
data->command_name = g_strdup (command_name);
data->func = func;
data->user_data = user_data;
return data;
}
static void
expected_command_free (gpointer ptr)
{
ExpectedCommand *data = ptr;
g_free (data->command_name);
g_slice_free (ExpectedCommand, data);
}
enum
{
SIGNAL_ERROR,
SIGNAL_STREAM_CONTROL,
N_SIGNALS
};
static guint signals[N_SIGNALS] = { 0, };
/* singletons */
static GstMemory *set_data_frame_value;
static void
init_set_data_frame_value (void)
{
GstAmfNode *node = gst_amf_node_new_string ("@setDataFrame", -1);
GBytes *bytes = gst_amf_node_serialize (node);
gsize size;
const gchar *data = g_bytes_get_data (bytes, &size);
set_data_frame_value = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
(gpointer) data, size, 0, size, bytes, (GDestroyNotify) g_bytes_unref);
GST_MINI_OBJECT_FLAG_SET (set_data_frame_value,
GST_MINI_OBJECT_FLAG_MAY_BE_LEAKED);
gst_amf_node_free (node);
}
/* class initialization */
G_DEFINE_TYPE_WITH_CODE (GstRtmpConnection, gst_rtmp_connection,
G_TYPE_OBJECT,
GST_DEBUG_CATEGORY_INIT (gst_rtmp_connection_debug_category,
"rtmpconnection", 0, "debug category for GstRtmpConnection class");
init_set_data_frame_value ());
static void
gst_rtmp_connection_class_init (GstRtmpConnectionClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->dispose = gst_rtmp_connection_dispose;
gobject_class->finalize = gst_rtmp_connection_finalize;
signals[SIGNAL_ERROR] = g_signal_new ("error", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
signals[SIGNAL_STREAM_CONTROL] = g_signal_new ("stream-control",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_UINT);
GST_DEBUG_REGISTER_FUNCPTR (gst_rtmp_connection_do_read);
}
static void
gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->output_queue =
g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->in_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
rtmpconnection->out_chunk_size = GST_RTMP_DEFAULT_CHUNK_SIZE;
rtmpconnection->input_bytes = g_byte_array_sized_new (2 * READ_SIZE);
rtmpconnection->input_needed_bytes = 1;
g_mutex_init (&rtmpconnection->stats_lock);
}
void
gst_rtmp_connection_dispose (GObject * object)
{
GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
GST_DEBUG_OBJECT (rtmpconnection, "dispose");
/* clean up as possible. may be called multiple times */
gst_rtmp_connection_close (rtmpconnection);
g_cancellable_cancel (rtmpconnection->cancellable);
gst_rtmp_connection_set_input_handler (rtmpconnection, NULL, NULL, NULL);
gst_rtmp_connection_set_output_handler (rtmpconnection, NULL, NULL, NULL);
G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->dispose (object);
}
void
gst_rtmp_connection_finalize (GObject * object)
{
GstRtmpConnection *rtmpconnection = GST_RTMP_CONNECTION (object);
GST_DEBUG_OBJECT (rtmpconnection, "finalize");
/* clean up object here */
g_mutex_clear (&rtmpconnection->stats_lock);
g_clear_object (&rtmpconnection->cancellable);
g_clear_object (&rtmpconnection->connection);
g_clear_pointer (&rtmpconnection->output_queue, g_async_queue_unref);
g_clear_pointer (&rtmpconnection->input_streams, gst_rtmp_chunk_streams_free);
g_clear_pointer (&rtmpconnection->output_streams,
gst_rtmp_chunk_streams_free);
g_clear_pointer (&rtmpconnection->input_bytes, g_byte_array_unref);
g_clear_pointer (&rtmpconnection->main_context, g_main_context_unref);
g_clear_pointer (&rtmpconnection->thread, g_thread_unref);
G_OBJECT_CLASS (gst_rtmp_connection_parent_class)->finalize (object);
}
GSocket *
gst_rtmp_connection_get_socket (GstRtmpConnection * sc)
{
return g_socket_connection_get_socket (sc->connection);
}
static void
gst_rtmp_connection_set_socket_connection (GstRtmpConnection * sc,
GSocketConnection * connection)
{
GInputStream *is;
sc->thread = g_thread_ref (g_thread_self ());
sc->main_context = g_main_context_ref_thread_default ();
sc->connection = g_object_ref (connection);
/* refs the socket because it's creating an input stream, which holds a ref */
is = g_io_stream_get_input_stream (G_IO_STREAM (sc->connection));
/* refs the socket because it's creating a socket source */
g_warn_if_fail (!sc->input_source);
sc->input_source =
g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is),
sc->cancellable);
g_source_set_callback (sc->input_source,
(GSourceFunc) gst_rtmp_connection_input_ready, g_object_ref (sc),
g_object_unref);
g_source_attach (sc->input_source, sc->main_context);
}
GstRtmpConnection *
gst_rtmp_connection_new (GSocketConnection * connection,
GCancellable * cancellable)
{
GstRtmpConnection *sc;
sc = g_object_new (GST_TYPE_RTMP_CONNECTION, NULL);
if (cancellable)
sc->cancellable = g_object_ref (cancellable);
else
sc->cancellable = g_cancellable_new ();
gst_rtmp_connection_set_socket_connection (sc, connection);
return sc;
}
static void
cancel_all_commands (GstRtmpConnection * self)
{
GList *l;
for (l = self->transactions; l; l = g_list_next (l)) {
Transaction *cc = l->data;
GST_LOG_OBJECT (self, "calling transaction callback %s",
GST_DEBUG_FUNCPTR_NAME (cc->func));
cc->func ("<cancelled>", NULL, cc->user_data);
}
g_list_free_full (self->transactions, transaction_free);
self->transactions = NULL;
for (l = self->expected_commands; l; l = g_list_next (l)) {
ExpectedCommand *cc = l->data;
GST_LOG_OBJECT (self, "calling expected command callback %s",
GST_DEBUG_FUNCPTR_NAME (cc->func));
cc->func ("<cancelled>", NULL, cc->user_data);
}
g_list_free_full (self->expected_commands, expected_command_free);
self->expected_commands = NULL;
}
void
gst_rtmp_connection_close (GstRtmpConnection * self)
{
if (self->thread != g_thread_self ()) {
GST_ERROR_OBJECT (self, "Called from wrong thread");
}
g_cancellable_cancel (self->cancellable);
cancel_all_commands (self);
if (self->input_source) {
g_source_destroy (self->input_source);
g_clear_pointer (&self->input_source, g_source_unref);
}
if (self->connection) {
g_io_stream_close_async (G_IO_STREAM (self->connection),
G_PRIORITY_DEFAULT, NULL, NULL, NULL);
}
}
void
gst_rtmp_connection_close_and_unref (gpointer ptr)
{
GstRtmpConnection *connection;
g_return_if_fail (ptr);
connection = GST_RTMP_CONNECTION (ptr);
gst_rtmp_connection_close (connection);
g_object_unref (connection);
}
void
gst_rtmp_connection_set_input_handler (GstRtmpConnection * sc,
GstRtmpConnectionMessageFunc callback, gpointer user_data,
GDestroyNotify user_data_destroy)
{
if (sc->input_handler_user_data_destroy) {
sc->input_handler_user_data_destroy (sc->input_handler_user_data);
}
sc->input_handler = callback;
sc->input_handler_user_data = user_data;
sc->input_handler_user_data_destroy = user_data_destroy;
}
void
gst_rtmp_connection_set_output_handler (GstRtmpConnection * sc,
GstRtmpConnectionFunc callback, gpointer user_data,
GDestroyNotify user_data_destroy)
{
if (sc->output_handler_user_data_destroy) {
sc->output_handler_user_data_destroy (sc->output_handler_user_data);
}
sc->output_handler = callback;
sc->output_handler_user_data = user_data;
sc->output_handler_user_data_destroy = user_data_destroy;
}
static gboolean
gst_rtmp_connection_input_ready (GInputStream * is, gpointer user_data)
{
GstRtmpConnection *sc = user_data;
gssize ret;
guint oldsize;
GError *error = NULL;
guint64 bytes_since_ack;
GST_TRACE_OBJECT (sc, "input ready");
oldsize = sc->input_bytes->len;
g_byte_array_set_size (sc->input_bytes, oldsize + READ_SIZE);
ret =
g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
sc->input_bytes->data + oldsize, READ_SIZE, sc->cancellable, &error);
g_byte_array_set_size (sc->input_bytes, oldsize + (ret > 0 ? ret : 0));
if (ret < 0) {
gint code = error->code;
if (error->domain == G_IO_ERROR && (code == G_IO_ERROR_WOULD_BLOCK ||
code == G_IO_ERROR_TIMED_OUT || code == G_IO_ERROR_AGAIN)) {
/* should retry */
GST_DEBUG_OBJECT (sc, "read IO error %d %s, continuing",
code, error->message);
g_error_free (error);
return G_SOURCE_CONTINUE;
}
GST_ERROR_OBJECT (sc, "read error: %s %d %s",
g_quark_to_string (error->domain), code, error->message);
g_error_free (error);
} else if (ret == 0) {
GST_INFO_OBJECT (sc, "read EOF");
}
if (ret <= 0) {
gst_rtmp_connection_emit_error (sc);
return G_SOURCE_REMOVE;
}
GST_TRACE_OBJECT (sc, "read %" G_GSIZE_FORMAT " bytes", ret);
g_mutex_lock (&sc->stats_lock);
sc->in_bytes_total += ret;
g_mutex_unlock (&sc->stats_lock);
bytes_since_ack = sc->in_bytes_total - sc->in_bytes_acked;
if (sc->in_window_ack_size && bytes_since_ack >= sc->in_window_ack_size) {
gst_rtmp_connection_send_ack (sc);
}
gst_rtmp_connection_try_read (sc);
return G_SOURCE_CONTINUE;
}
static void
gst_rtmp_connection_start_write (GstRtmpConnection * self)
{
GOutputStream *os;
GstBuffer *message, *chunks;
GstRtmpMeta *meta;
GstRtmpChunkStream *cstream;
if (self->writing) {
return;
}
message = g_async_queue_try_pop (self->output_queue);
if (!message) {
return;
}
meta = gst_buffer_get_rtmp_meta (message);
if (!meta) {
GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
goto out;
}
if (gst_rtmp_message_is_protocol_control (message)) {
if (!gst_rtmp_connection_prepare_protocol_control (self, message)) {
GST_ERROR_OBJECT (self,
"Failed to prepare protocol control %" GST_PTR_FORMAT, message);
goto out;
}
}
cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
if (!cstream) {
GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
message);
goto out;
}
chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
self->out_chunk_size);
if (!chunks) {
GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
goto out;
}
self->writing = TRUE;
if (self->output_handler) {
self->output_handler (self, self->output_handler_user_data);
}
os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
self->cancellable, gst_rtmp_connection_write_buffer_done,
g_object_ref (self));
gst_buffer_unref (chunks);
out:
gst_buffer_unref (message);
}
static void
gst_rtmp_connection_emit_error (GstRtmpConnection * self)
{
if (self->error) {
return;
}
GST_INFO_OBJECT (self, "connection error");
self->error = TRUE;
cancel_all_commands (self);
g_signal_emit (self, signals[SIGNAL_ERROR], 0);
}
static void
gst_rtmp_connection_write_buffer_done (GObject * obj,
GAsyncResult * result, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
gsize bytes_written = 0;
GError *error = NULL;
gboolean res;
self->writing = FALSE;
res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
&bytes_written, &error);
g_mutex_lock (&self->stats_lock);
self->out_bytes_total += bytes_written;
g_mutex_unlock (&self->stats_lock);
if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_INFO_OBJECT (self,
"write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
} else {
GST_ERROR_OBJECT (self,
"write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
error->message, bytes_written);
}
gst_rtmp_connection_emit_error (self);
g_error_free (error);
g_object_unref (self);
return;
}
GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
bytes_written);
gst_rtmp_connection_apply_protocol_control (self);
gst_rtmp_connection_start_write (self);
g_object_unref (self);
}
static void
gst_rtmp_connection_start_read (GstRtmpConnection * connection,
guint needed_bytes)
{
g_return_if_fail (needed_bytes > 0);
connection->input_needed_bytes = needed_bytes;
gst_rtmp_connection_try_read (connection);
}
static void
gst_rtmp_connection_try_read (GstRtmpConnection * connection)
{
guint need = connection->input_needed_bytes,
len = connection->input_bytes->len;
if (len < need) {
GST_TRACE_OBJECT (connection, "got %u < %u bytes, need more", len, need);
return;
}
GST_TRACE_OBJECT (connection, "got %u >= %u bytes, proceeding", len, need);
gst_rtmp_connection_do_read (connection);
}
static void
gst_rtmp_connection_take_input_bytes (GstRtmpConnection * sc, gsize size,
GBytes ** outbytes)
{
g_return_if_fail (size <= sc->input_bytes->len);
if (outbytes) {
*outbytes = g_bytes_new (sc->input_bytes->data, size);
}
g_byte_array_remove_range (sc->input_bytes, 0, size);
}
static void
gst_rtmp_connection_do_read (GstRtmpConnection * sc)
{
GByteArray *input_bytes = sc->input_bytes;
gsize needed_bytes = 1;
while (1) {
GstRtmpChunkStream *cstream;
guint32 chunk_stream_id, header_size, next_size;
guint8 *data;
chunk_stream_id = gst_rtmp_chunk_stream_parse_id (input_bytes->data,
input_bytes->len);
if (!chunk_stream_id) {
needed_bytes = input_bytes->len + 1;
break;
}
cstream = gst_rtmp_chunk_streams_get (sc->input_streams, chunk_stream_id);
header_size = gst_rtmp_chunk_stream_parse_header (cstream,
input_bytes->data, input_bytes->len);
if (input_bytes->len < header_size) {
needed_bytes = header_size;
break;
}
next_size = gst_rtmp_chunk_stream_parse_payload (cstream,
sc->in_chunk_size, &data);
if (input_bytes->len < header_size + next_size) {
needed_bytes = header_size + next_size;
break;
}
memcpy (data, input_bytes->data + header_size, next_size);
gst_rtmp_connection_take_input_bytes (sc, header_size + next_size, NULL);
next_size = gst_rtmp_chunk_stream_wrote_payload (cstream,
sc->in_chunk_size);
if (next_size == 0) {
GstBuffer *buffer = gst_rtmp_chunk_stream_parse_finish (cstream);
gst_rtmp_connection_handle_message (sc, buffer);
gst_buffer_unref (buffer);
}
}
gst_rtmp_connection_start_read (sc, needed_bytes);
}
static void
gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
{
if (gst_rtmp_message_is_protocol_control (buffer)) {
gst_rtmp_connection_handle_protocol_control (sc, buffer);
return;
}
if (gst_rtmp_message_is_user_control (buffer)) {
gst_rtmp_connection_handle_user_control (sc, buffer);
return;
}
switch (gst_rtmp_message_get_type (buffer)) {
case GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0:
gst_rtmp_connection_handle_cm (sc, buffer);
return;
case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
gst_rtmp_connection_handle_aggregate (sc, buffer);
break;
default:
if (sc->input_handler) {
sc->input_handler (sc, buffer, sc->input_handler_user_data);
}
return;
}
}
static void
gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
GstBuffer * buffer)
{
GstRtmpMeta *meta;
GstMapInfo map;
gsize pos = 0;
guint32 first_ts = 0;
meta = gst_buffer_get_rtmp_meta (buffer);
g_return_if_fail (meta);
gst_buffer_map (buffer, &map, GST_MAP_READ);
GST_TRACE_OBJECT (connection, "got aggregate message");
/* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
* The payload is part of a FLV file.
*
* WARNING: This spec defines the payload to use an "RTMP message format"
* which misidentifies the format of the timestamps and omits the size of the
* backpointers. */
while (pos < map.size) {
gsize remaining = map.size - pos;
GstBuffer *submessage;
GstRtmpMeta *submeta;
GstRtmpFlvTagHeader header;
if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
GST_ERROR_OBJECT (connection,
"aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
break;
}
if (remaining < header.total_size) {
GST_ERROR_OBJECT (connection,
"aggregate contains incomplete message; want %" G_GSIZE_FORMAT
", got %" G_GSIZE_FORMAT, header.total_size, remaining);
break;
}
submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
GST_BUFFER_OFFSET_END (submessage) =
GST_BUFFER_OFFSET (submessage) + header.total_size;
submeta = gst_buffer_get_rtmp_meta (submessage);
g_assert (submeta);
submeta->type = header.type;
submeta->size = header.payload_size;
if (pos == 0) {
first_ts = header.timestamp;
} else {
guint32 ts_offset = header.timestamp - first_ts;
submeta->ts_delta += ts_offset;
GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
}
gst_rtmp_buffer_dump (submessage, "<<< submessage");
gst_rtmp_connection_handle_message (connection, submessage);
gst_buffer_unref (submessage);
pos += header.total_size;
}
gst_buffer_unmap (buffer, &map);
}
static void
gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
GstBuffer * buffer)
{
GstRtmpProtocolControl pc;
if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
GST_ERROR_OBJECT (connection, "can't parse protocol control message");
return;
}
GST_LOG_OBJECT (connection, "got protocol control message %d:%s", pc.type,
gst_rtmp_message_type_get_nick (pc.type));
switch (pc.type) {
case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:
GST_INFO_OBJECT (connection, "incoming chunk size %" G_GUINT32_FORMAT,
pc.param);
gst_rtmp_connection_handle_set_chunk_size (connection, pc.param);
break;
case GST_RTMP_MESSAGE_TYPE_ABORT_MESSAGE:
GST_ERROR_OBJECT (connection, "unimplemented: chunk abort, stream_id = %"
G_GUINT32_FORMAT, pc.param);
break;
case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
pc.param);
gst_rtmp_connection_handle_ack (connection, pc.param);
break;
case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
GST_INFO_OBJECT (connection,
"incoming window ack size: %" G_GUINT32_FORMAT, pc.param);
gst_rtmp_connection_handle_window_ack_size (connection, pc.param);
break;
case GST_RTMP_MESSAGE_TYPE_SET_PEER_BANDWIDTH:
GST_FIXME_OBJECT (connection, "set peer bandwidth: %" G_GUINT32_FORMAT
", %" G_GUINT32_FORMAT, pc.param, pc.param2);
/* FIXME this is not correct, but close enough */
gst_rtmp_connection_request_window_size (connection, pc.param);
break;
default:
GST_ERROR_OBJECT (connection, "unimplemented protocol control type %d:%s",
pc.type, gst_rtmp_message_type_get_nick (pc.type));
break;
}
}
static void
gst_rtmp_connection_handle_user_control (GstRtmpConnection * connection,
GstBuffer * buffer)
{
GstRtmpUserControl uc;
if (!gst_rtmp_message_parse_user_control (buffer, &uc)) {
GST_ERROR_OBJECT (connection, "can't parse user control message");
return;
}
GST_LOG_OBJECT (connection, "got user control message %d:%s", uc.type,
gst_rtmp_user_control_type_get_nick (uc.type));
switch (uc.type) {
case GST_RTMP_USER_CONTROL_TYPE_STREAM_BEGIN:
case GST_RTMP_USER_CONTROL_TYPE_STREAM_EOF:
case GST_RTMP_USER_CONTROL_TYPE_STREAM_DRY:
case GST_RTMP_USER_CONTROL_TYPE_STREAM_IS_RECORDED:
GST_INFO_OBJECT (connection, "stream %u got %s", uc.param,
gst_rtmp_user_control_type_get_nick (uc.type));
g_signal_emit (connection, signals[SIGNAL_STREAM_CONTROL], 0,
uc.type, uc.param);
break;
case GST_RTMP_USER_CONTROL_TYPE_SET_BUFFER_LENGTH:
GST_FIXME_OBJECT (connection, "ignoring set buffer length: %"
G_GUINT32_FORMAT ", %" G_GUINT32_FORMAT " ms", uc.param, uc.param2);
break;
case GST_RTMP_USER_CONTROL_TYPE_PING_REQUEST:
GST_DEBUG_OBJECT (connection, "ping request: %" G_GUINT32_FORMAT,
uc.param);
gst_rtmp_connection_send_ping_response (connection, uc.param);
break;
case GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE:
GST_DEBUG_OBJECT (connection,
"ignoring ping response: %" G_GUINT32_FORMAT, uc.param);
break;
case GST_RTMP_USER_CONTROL_TYPE_BUFFER_EMPTY:
GST_LOG_OBJECT (connection, "ignoring buffer empty: %" G_GUINT32_FORMAT,
uc.param);
break;
case GST_RTMP_USER_CONTROL_TYPE_BUFFER_READY:
GST_LOG_OBJECT (connection, "ignoring buffer ready: %" G_GUINT32_FORMAT,
uc.param);
break;
default:
GST_ERROR_OBJECT (connection, "unimplemented user control type %d:%s",
uc.type, gst_rtmp_user_control_type_get_nick (uc.type));
break;
}
}
static void
gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
guint32 chunk_size)
{
if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
GST_ERROR_OBJECT (self,
"peer requested chunk size %" G_GUINT32_FORMAT "; too small",
chunk_size);
return;
}
if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
GST_ERROR_OBJECT (self,
"peer requested chunk size %" G_GUINT32_FORMAT "; too large",
chunk_size);
return;
}
if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
GST_WARNING_OBJECT (self,
"peer requested small chunk size %" G_GUINT32_FORMAT, chunk_size);
}
g_mutex_lock (&self->stats_lock);
self->in_chunk_size = chunk_size;
g_mutex_unlock (&self->stats_lock);
}
static void
gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
{
guint64 last_ack, new_ack;
guint32 last_ack_low, last_ack_high;
last_ack = self->out_bytes_acked;
last_ack_low = last_ack & G_MAXUINT32;
last_ack_high = (last_ack >> 32) & G_MAXUINT32;
if (bytes < last_ack_low) {
GST_WARNING_OBJECT (self,
"Acknowledgement bytes regression, assuming rollover: %"
G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
last_ack_high += 1;
}
new_ack = (((guint64) last_ack_high) << 32) | bytes;
GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
new_ack - last_ack);
g_mutex_lock (&self->stats_lock);
self->out_bytes_acked = new_ack;
g_mutex_unlock (&self->stats_lock);
}
static void
gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
guint32 window_ack_size)
{
if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
GST_WARNING_OBJECT (self,
"peer requested small window ack size %" G_GUINT32_FORMAT,
window_ack_size);
}
g_mutex_lock (&self->stats_lock);
self->in_window_ack_size = window_ack_size;
g_mutex_unlock (&self->stats_lock);
}
static gboolean
is_command_response (const gchar * command_name)
{
return g_strcmp0 (command_name, "_result") == 0 ||
g_strcmp0 (command_name, "_error") == 0;
}
static void
gst_rtmp_connection_handle_cm (GstRtmpConnection * sc, GstBuffer * buffer)
{
GstRtmpMeta *meta;
gchar *command_name;
gdouble transaction_id;
GPtrArray *args;
meta = gst_buffer_get_rtmp_meta (buffer);
g_return_if_fail (meta);
{
GstMapInfo map;
gst_buffer_map (buffer, &map, GST_MAP_READ);
args = gst_amf_parse_command (map.data, map.size, &transaction_id,
&command_name);
gst_buffer_unmap (buffer, &map);
}
if (!args) {
return;
}
if (!isfinite (transaction_id) || transaction_id < 0 ||
transaction_id > G_MAXUINT) {
GST_WARNING_OBJECT (sc,
"Server sent command \"%s\" with extreme transaction ID %.0f",
GST_STR_NULL (command_name), transaction_id);
} else if (transaction_id > sc->transaction_count) {
GST_WARNING_OBJECT (sc,
"Server sent command \"%s\" with unused transaction ID (%.0f > %u)",
GST_STR_NULL (command_name), transaction_id, sc->transaction_count);
sc->transaction_count = transaction_id;
}
GST_DEBUG_OBJECT (sc,
"got control message \"%s\" transaction %.0f size %"
G_GUINT32_FORMAT, GST_STR_NULL (command_name), transaction_id,
meta->size);
if (is_command_response (command_name)) {
if (transaction_id != 0) {
GList *l;
for (l = sc->transactions; l; l = g_list_next (l)) {
Transaction *t = l->data;
if (t->transaction_id != transaction_id) {
continue;
}
GST_LOG_OBJECT (sc, "calling transaction callback %s",
GST_DEBUG_FUNCPTR_NAME (t->func));
sc->transactions = g_list_remove_link (sc->transactions, l);
t->func (command_name, args, t->user_data);
g_list_free_full (l, transaction_free);
break;
}
} else {
GST_WARNING_OBJECT (sc, "Server sent response \"%s\" without transaction",
GST_STR_NULL (command_name));
}
} else {
GList *l;
if (transaction_id != 0) {
GST_FIXME_OBJECT (sc, "Server sent command \"%s\" expecting reply",
GST_STR_NULL (command_name));
}
for (l = sc->expected_commands; l; l = g_list_next (l)) {
ExpectedCommand *ec = l->data;
if (ec->stream_id != meta->mstream) {
continue;
}
if (g_strcmp0 (ec->command_name, command_name)) {
continue;
}
GST_LOG_OBJECT (sc, "calling expected command callback %s",
GST_DEBUG_FUNCPTR_NAME (ec->func));
sc->expected_commands = g_list_remove_link (sc->expected_commands, l);
ec->func (command_name, args, ec->user_data);
g_list_free_full (l, expected_command_free);
break;
}
}
g_free (command_name);
g_ptr_array_unref (args);
}
static gboolean
start_write (gpointer user_data)
{
GstRtmpConnection *sc = user_data;
gst_rtmp_connection_start_write (sc);
return G_SOURCE_REMOVE;
}
void
gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
{
g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
g_return_if_fail (GST_IS_BUFFER (buffer));
g_async_queue_push (self->output_queue, buffer);
g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
start_write, g_object_ref (self), g_object_unref);
}
guint
gst_rtmp_connection_get_num_queued (GstRtmpConnection * connection)
{
return g_async_queue_length (connection->output_queue);
}
guint
gst_rtmp_connection_send_command (GstRtmpConnection * connection,
GstRtmpCommandCallback response_command, gpointer user_data,
guint32 stream_id, const gchar * command_name, const GstAmfNode * argument,
...)
{
GstBuffer *buffer;
gdouble transaction_id = 0;
va_list ap;
GBytes *payload;
guint8 *data;
gsize size;
g_return_val_if_fail (GST_IS_RTMP_CONNECTION (connection), 0);
if (connection->thread != g_thread_self ()) {
GST_ERROR_OBJECT (connection, "Called from wrong thread");
}
GST_DEBUG_OBJECT (connection,
"Sending command '%s' on stream id %" G_GUINT32_FORMAT,
command_name, stream_id);
if (response_command) {
Transaction *t;
transaction_id = ++connection->transaction_count;
GST_LOG_OBJECT (connection, "Registering %s for transid %.0f",
GST_DEBUG_FUNCPTR_NAME (response_command), transaction_id);
t = transaction_new (transaction_id, response_command, user_data);
connection->transactions = g_list_append (connection->transactions, t);
}
va_start (ap, argument);
payload = gst_amf_serialize_command_valist (transaction_id,
command_name, argument, ap);
va_end (ap);
data = g_bytes_unref_to_data (payload, &size);
buffer = gst_rtmp_message_new_wrapped (GST_RTMP_MESSAGE_TYPE_COMMAND_AMF0,
3, stream_id, data, size);
gst_rtmp_connection_queue_message (connection, buffer);
return transaction_id;
}
void
gst_rtmp_connection_expect_command (GstRtmpConnection * connection,
GstRtmpCommandCallback response_command, gpointer user_data,
guint32 stream_id, const gchar * command_name)
{
ExpectedCommand *ec;
g_return_if_fail (response_command);
g_return_if_fail (command_name);
g_return_if_fail (!is_command_response (command_name));
GST_LOG_OBJECT (connection,
"Registering %s for stream id %" G_GUINT32_FORMAT " name \"%s\"",
GST_DEBUG_FUNCPTR_NAME (response_command), stream_id, command_name);
ec = expected_command_new (stream_id, command_name, response_command,
user_data);
connection->expected_commands =
g_list_append (connection->expected_commands, ec);
}
static void
gst_rtmp_connection_send_ack (GstRtmpConnection * connection)
{
guint64 in_bytes_total = connection->in_bytes_total;
GstRtmpProtocolControl pc = {
.type = GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT,
.param = (guint32) in_bytes_total,
};
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
g_mutex_lock (&connection->stats_lock);
connection->in_bytes_acked = in_bytes_total;
g_mutex_unlock (&connection->stats_lock);
}
static void
gst_rtmp_connection_send_ping_response (GstRtmpConnection * connection,
guint32 event_data)
{
GstRtmpUserControl uc = {
.type = GST_RTMP_USER_CONTROL_TYPE_PING_RESPONSE,
.param = event_data,
};
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_user_control (&uc));
}
void
gst_rtmp_connection_set_chunk_size (GstRtmpConnection * connection,
guint32 chunk_size)
{
GstRtmpProtocolControl pc = {
.type = GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE,
.param = chunk_size,
};
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
}
void
gst_rtmp_connection_request_window_size (GstRtmpConnection * connection,
guint32 window_ack_size)
{
GstRtmpProtocolControl pc = {
.type = GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE,
.param = window_ack_size,
};
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
gst_rtmp_connection_queue_message (connection,
gst_rtmp_message_new_protocol_control (&pc));
}
void
gst_rtmp_connection_set_data_frame (GstRtmpConnection * connection,
GstBuffer * buffer)
{
g_return_if_fail (GST_IS_RTMP_CONNECTION (connection));
g_return_if_fail (GST_IS_BUFFER (buffer));
gst_buffer_prepend_memory (buffer, gst_memory_ref (set_data_frame_value));
gst_rtmp_connection_queue_message (connection, buffer);
}
static gboolean
gst_rtmp_connection_prepare_protocol_control (GstRtmpConnection * self,
GstBuffer * buffer)
{
GstRtmpProtocolControl pc;
if (!gst_rtmp_message_parse_protocol_control (buffer, &pc)) {
GST_ERROR_OBJECT (self, "can't parse protocol control message");
return FALSE;
}
switch (pc.type) {
case GST_RTMP_MESSAGE_TYPE_SET_CHUNK_SIZE:{
guint32 chunk_size = pc.param;
GST_INFO_OBJECT (self, "pending chunk size %" G_GUINT32_FORMAT,
chunk_size);
if (chunk_size < GST_RTMP_MINIMUM_CHUNK_SIZE) {
GST_ERROR_OBJECT (self,
"requested chunk size %" G_GUINT32_FORMAT " is too small",
chunk_size);
return FALSE;
}
if (chunk_size > GST_RTMP_MAXIMUM_CHUNK_SIZE) {
GST_ERROR_OBJECT (self,
"requested chunk size %" G_GUINT32_FORMAT " is too large",
chunk_size);
return FALSE;
}
if (chunk_size < GST_RTMP_DEFAULT_CHUNK_SIZE) {
GST_WARNING_OBJECT (self,
"requesting small chunk size %" G_GUINT32_FORMAT, chunk_size);
}
self->out_chunk_size_pending = pc.param;
break;
}
case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:{
guint32 window_ack_size = pc.param;
GST_INFO_OBJECT (self, "pending window ack size: %" G_GUINT32_FORMAT,
window_ack_size);
if (window_ack_size < GST_RTMP_DEFAULT_WINDOW_ACK_SIZE) {
GST_WARNING_OBJECT (self,
"requesting small window ack size %" G_GUINT32_FORMAT,
window_ack_size);
}
self->out_window_ack_size_pending = window_ack_size;
break;
}
default:
break;
}
return TRUE;
}
static void
gst_rtmp_connection_apply_protocol_control (GstRtmpConnection * self)
{
guint32 chunk_size, window_ack_size;
chunk_size = self->out_chunk_size_pending;
if (chunk_size) {
self->out_chunk_size_pending = 0;
g_mutex_lock (&self->stats_lock);
self->out_chunk_size = chunk_size;
g_mutex_unlock (&self->stats_lock);
GST_INFO_OBJECT (self, "applied chunk size %" G_GUINT32_FORMAT, chunk_size);
}
window_ack_size = self->out_window_ack_size_pending;
if (window_ack_size) {
self->out_window_ack_size_pending = 0;
g_mutex_lock (&self->stats_lock);
self->out_window_ack_size = window_ack_size;
g_mutex_unlock (&self->stats_lock);
GST_INFO_OBJECT (self, "applied window ack size %" G_GUINT32_FORMAT,
window_ack_size);
}
}
static GstStructure *
get_stats (GstRtmpConnection * self)
{
return gst_structure_new ("GstRtmpConnectionStats",
"in-chunk-size", G_TYPE_UINT, self ? self->in_chunk_size : 0,
"out-chunk-size", G_TYPE_UINT, self ? self->out_chunk_size : 0,
"in-window-ack-size", G_TYPE_UINT, self ? self->in_window_ack_size : 0,
"out-window-ack-size", G_TYPE_UINT, self ? self->out_window_ack_size : 0,
"in-bytes-total", G_TYPE_UINT64, self ? self->in_bytes_total : 0,
"out-bytes-total", G_TYPE_UINT64, self ? self->out_bytes_total : 0,
"in-bytes-acked", G_TYPE_UINT64, self ? self->in_bytes_acked : 0,
"out-bytes-acked", G_TYPE_UINT64, self ? self->out_bytes_acked : 0, NULL);
}
GstStructure *
gst_rtmp_connection_get_null_stats (void)
{
return get_stats (NULL);
}
GstStructure *
gst_rtmp_connection_get_stats (GstRtmpConnection * self)
{
GstStructure *s;
g_return_val_if_fail (GST_IS_RTMP_CONNECTION (self), NULL);
g_mutex_lock (&self->stats_lock);
s = get_stats (self);
g_mutex_unlock (&self->stats_lock);
return s;
}