rtmp2: Chunk messages as buffers in loop thread

Move output chunking from gst_rtmp_connection_queue_message into
gst_rtmp_connection_start_write, which effectively moves it from the
streaming thread into the loop thread.

This allows us to handle the outgoing chunk-size message (which is
generated by changing the future chunk-size property) properly, which
could come from any other thread.
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-01-27 14:05:31 +01:00 committed by GStreamer Merge Bot
parent 9a13df9ba5
commit a566461294
2 changed files with 30 additions and 19 deletions

View file

@ -643,7 +643,7 @@ gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
gst_rtmp_buffer_dump (buffer, ">>> message");
chunk_stream_clear (cstream);
chunk_stream_take_buffer (cstream, buffer);
chunk_stream_take_buffer (cstream, gst_buffer_ref (buffer));
return serialize_next (cstream, chunk_size, type);
}

View file

@ -464,17 +464,39 @@ static void
gst_rtmp_connection_start_write (GstRtmpConnection * self)
{
GOutputStream *os;
GstBuffer *chunks;
GstBuffer *message, *chunks;
GstRtmpMeta *meta;
GstRtmpChunkStream *cstream;
if (self->writing) {
return;
}
chunks = g_async_queue_try_pop (self->output_queue);
if (!chunks) {
message = g_async_queue_try_pop (self->output_queue);
if (!message) {
return;
}
meta = gst_buffer_get_rtmp_meta (message);
if (!meta) {
GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message);
goto out;
}
cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
if (!cstream) {
GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT,
message);
goto out;
}
chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message,
self->out_chunk_size);
if (!chunks) {
GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message);
goto out;
}
self->writing = TRUE;
if (self->output_handler) {
self->output_handler (self, self->output_handler_user_data);
@ -486,6 +508,9 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self)
g_object_ref (self));
gst_buffer_unref (chunks);
out:
gst_buffer_unref (message);
}
static void
@ -910,24 +935,10 @@ start_write (gpointer user_data)
void
gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer)
{
GstRtmpMeta *meta;
GstRtmpChunkStream *cstream;
GstBuffer *chunks;
g_return_if_fail (GST_IS_RTMP_CONNECTION (self));
g_return_if_fail (GST_IS_BUFFER (buffer));
meta = gst_buffer_get_rtmp_meta (buffer);
g_return_if_fail (meta);
cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream);
g_return_if_fail (cstream);
chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer,
self->out_chunk_size);
g_return_if_fail (chunks);
g_async_queue_push (self->output_queue, chunks);
g_async_queue_push (self->output_queue, buffer);
g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT,
start_write, g_object_ref (self), g_object_unref);
}