rtmp2: Consistently use GstBuffer for RTMP chunks

This commit is contained in:
Jan Alexander Steffens (heftig) 2020-02-12 16:55:15 +01:00 committed by GStreamer Merge Bot
parent b03780233e
commit 9a13df9ba5

View file

@ -97,7 +97,7 @@ static void gst_rtmp_connection_emit_error (GstRtmpConnection * self);
static gboolean gst_rtmp_connection_input_ready (GInputStream * is,
gpointer user_data);
static void gst_rtmp_connection_start_write (GstRtmpConnection * self);
static void gst_rtmp_connection_write_bytes_done (GObject * obj,
static void gst_rtmp_connection_write_buffer_done (GObject * obj,
GAsyncResult * result, gpointer user_data);
static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
guint needed_bytes);
@ -232,7 +232,7 @@ gst_rtmp_connection_init (GstRtmpConnection * rtmpconnection)
{
rtmpconnection->cancellable = g_cancellable_new ();
rtmpconnection->output_queue =
g_async_queue_new_full ((GDestroyNotify) g_bytes_unref);
g_async_queue_new_full ((GDestroyNotify) gst_buffer_unref);
rtmpconnection->input_streams = gst_rtmp_chunk_streams_new ();
rtmpconnection->output_streams = gst_rtmp_chunk_streams_new ();
@ -464,14 +464,14 @@ static void
gst_rtmp_connection_start_write (GstRtmpConnection * self)
{
GOutputStream *os;
GBytes *bytes;
GstBuffer *chunks;
if (self->writing) {
return;
}
bytes = g_async_queue_try_pop (self->output_queue);
if (!bytes) {
chunks = g_async_queue_try_pop (self->output_queue);
if (!chunks) {
return;
}
@ -481,10 +481,11 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self)
}
os = g_io_stream_get_output_stream (G_IO_STREAM (self->connection));
gst_rtmp_output_stream_write_all_bytes_async (os, bytes,
G_PRIORITY_DEFAULT, self->cancellable,
gst_rtmp_connection_write_bytes_done, g_object_ref (self));
g_bytes_unref (bytes);
gst_rtmp_output_stream_write_all_buffer_async (os, chunks, G_PRIORITY_DEFAULT,
self->cancellable, gst_rtmp_connection_write_buffer_done,
g_object_ref (self));
gst_buffer_unref (chunks);
}
static void
@ -503,7 +504,7 @@ gst_rtmp_connection_emit_error (GstRtmpConnection * self)
}
static void
gst_rtmp_connection_write_bytes_done (GObject * obj,
gst_rtmp_connection_write_buffer_done (GObject * obj,
GAsyncResult * result, gpointer user_data)
{
GOutputStream *os = G_OUTPUT_STREAM (obj);
@ -513,7 +514,9 @@ gst_rtmp_connection_write_bytes_done (GObject * obj,
self->writing = FALSE;
res = gst_rtmp_output_stream_write_all_bytes_finish (os, result, &error);
res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, NULL,
&error);
if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_INFO_OBJECT (self, "write cancelled");
@ -904,26 +907,12 @@ start_write (gpointer user_data)
return G_SOURCE_REMOVE;
}
static void
byte_array_take_buffer (GByteArray * byte_array, GstBuffer * buffer)
{
GstMapInfo map;
gboolean ret;
ret = gst_buffer_map (buffer, &map, GST_MAP_READ);
g_assert (ret);
g_assert (byte_array->len + map.size <= (guint64) G_MAXUINT);
g_byte_array_append (byte_array, map.data, map.size);
gst_buffer_unmap (buffer, &map);
gst_buffer_unref (buffer);
}
void
gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
{
GstRtmpMeta *meta;
GstRtmpChunkStream *cstream;
GstBuffer *out_buffer;
GByteArray *out_ba;
GstBuffer *chunks;
g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
g_return_if_fail (GST_IS_BUFFER (buffer));
@ -934,20 +923,11 @@ gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
g_return_if_fail (cstream);
out_buffer = gst_rtmp_chunk_stream_serialize_start (cstream, buffer,
chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer,
self->out_chunk_size);
g_return_if_fail (out_buffer);
g_return_if_fail (chunks);
out_ba = g_byte_array_new ();
while (out_buffer) {
byte_array_take_buffer (out_ba, out_buffer);
out_buffer = gst_rtmp_chunk_stream_serialize_next (cstream,
self->out_chunk_size);
}
g_async_queue_push (self->output_queue, g_byte_array_free_to_bytes (out_ba));
g_async_queue_push (self->output_queue, chunks);
g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
start_write, g_object_ref (self), g_object_unref);
}