rtmp2: Add gst_rtmp_output_stream_write_all_buffer_async

Similar to gst_rtmp_output_stream_write_all_bytes_async, but takes a
GstBuffer instead of a GBytes. It can also return the number of bytes
written, which might be lower in case of an error.
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-02-12 16:43:30 +01:00 committed by GStreamer Merge Bot
parent 286a3829b6
commit cb7f0c4be7
2 changed files with 112 additions and 1 deletions

View file

@ -30,6 +30,8 @@ static void read_all_bytes_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void write_all_bytes_done (GObject * source, GAsyncResult * result,
gpointer user_data);
static void write_all_buffer_done (GObject * source, GAsyncResult * result,
gpointer user_data);
void
gst_rtmp_byte_array_append_bytes (GByteArray * bytearray, GBytes * bytes)
@ -154,6 +156,108 @@ gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
return g_task_propagate_boolean (G_TASK (result), error);
}
typedef struct
{
GstBuffer *buffer;
GstMapInfo map;
gboolean mapped;
gsize bytes_written;
} WriteAllBufferData;
static WriteAllBufferData *
write_all_buffer_data_new (GstBuffer * buffer)
{
WriteAllBufferData *data = g_slice_new0 (WriteAllBufferData);
data->buffer = gst_buffer_ref (buffer);
return data;
}
static void
write_all_buffer_data_free (gpointer ptr)
{
WriteAllBufferData *data = ptr;
if (data->mapped) {
gst_buffer_unmap (data->buffer, &data->map);
}
g_clear_pointer (&data->buffer, gst_buffer_unref);
g_slice_free (WriteAllBufferData, data);
}
void
gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
GstBuffer * buffer, int io_priority, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data)
{
GTask *task;
WriteAllBufferData *data;
g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
g_return_if_fail (GST_IS_BUFFER (buffer));
task = g_task_new (stream, cancellable, callback, user_data);
data = write_all_buffer_data_new (buffer);
g_task_set_task_data (task, data, write_all_buffer_data_free);
if (!gst_buffer_map (buffer, &data->map, GST_MAP_READ)) {
GST_ERROR ("Failed to map %" GST_PTR_FORMAT, buffer);
g_task_return_new_error (task, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
"Failed to map buffer for reading");
g_object_unref (task);
return;
}
data->mapped = TRUE;
g_output_stream_write_all_async (stream, data->map.data, data->map.size,
io_priority, cancellable, write_all_buffer_done, task);
}
static void
write_all_buffer_done (GObject * source, GAsyncResult * result,
gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (source);
GTask *task = user_data;
WriteAllBufferData *data = g_task_get_task_data (task);
GError *error = NULL;
gboolean res;
res = g_output_stream_write_all_finish (os, result, &data->bytes_written,
&error);
gst_buffer_unmap (data->buffer, &data->map);
data->mapped = FALSE;
if (!res) {
g_task_return_error (task, error);
g_object_unref (task);
return;
}
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
gboolean
gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
GAsyncResult * result, gsize * bytes_written, GError ** error)
{
WriteAllBufferData *data;
GTask *task;
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
task = G_TASK (result);
data = g_task_get_task_data (task);
if (bytes_written) {
*bytes_written = data->bytes_written;
}
return g_task_propagate_boolean (task, error);
}
static const gchar ascii_table[128] = {
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,

View file

@ -20,6 +20,7 @@
#ifndef _GST_RTMP_UTILS_H_
#define _GST_RTMP_UTILS_H_
#include <gst/gst.h>
#include <gio/gio.h>
G_BEGIN_DECLS
@ -38,7 +39,13 @@ void gst_rtmp_output_stream_write_all_bytes_async (GOutputStream * stream,
gboolean gst_rtmp_output_stream_write_all_bytes_finish (GOutputStream * stream,
GAsyncResult * result, GError ** error);
void gst_rtmp_string_print_escaped (GString * string, const gchar *data,
void gst_rtmp_output_stream_write_all_buffer_async (GOutputStream * stream,
GstBuffer * buffer, int io_priority, GCancellable * cancellable,
GAsyncReadyCallback callback, gpointer user_data);
gboolean gst_rtmp_output_stream_write_all_buffer_finish (GOutputStream * stream,
GAsyncResult * result, gsize * bytes_written, GError ** error);
void gst_rtmp_string_print_escaped (GString * string, const gchar * data,
gssize size);
G_END_DECLS