From 7f5347a66481f0834440e1ac7a64b1e4a74b4e7e Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Fri, 27 Mar 2020 15:40:00 +0900 Subject: [PATCH] rtmp2src: Add idle-timeout property Add new property to signalling that there is no incoming data from peer. This can be useful if users want to stop the streaming when the connection is alive but no packet is arriving. --- gst/rtmp2/gstrtmp2src.c | 105 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/gst/rtmp2/gstrtmp2src.c b/gst/rtmp2/gstrtmp2src.c index 4ed4f98dbf..dff4eac81a 100644 --- a/gst/rtmp2/gstrtmp2src.c +++ b/gst/rtmp2/gstrtmp2src.c @@ -60,6 +60,7 @@ typedef struct GstRtmpLocation location; gboolean async_connect; GstStructure *stats; + guint idle_timeout; /* If both self->lock and OBJECT_LOCK are needed, * self->lock must be taken first */ @@ -67,6 +68,8 @@ typedef struct GCond cond; gboolean running, flushing; + gboolean timeout; + gboolean started; GstTask *task; GRecMutex task_lock; @@ -133,8 +136,11 @@ enum PROP_FLASH_VERSION, PROP_ASYNC_CONNECT, PROP_STATS, + PROP_IDLE_TIMEOUT, }; +#define DEFAULT_IDLE_TIMEOUT 0 + /* pad templates */ static GstStaticPadTemplate gst_rtmp2_src_src_template = @@ -200,6 +206,13 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT, + g_param_spec_uint ("idle-timeout", "Idle timeout", + "The maximum allowed time in seconds for valid packets not to arrive " + "from the peer (0 = no timeout)", + 0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, "debug category for rtmp2src element"); } @@ -208,6 +221,7 @@ static void gst_rtmp2_src_init (GstRtmp2Src * self) { self->async_connect = TRUE; + self->idle_timeout = DEFAULT_IDLE_TIMEOUT; g_mutex_init (&self->lock); g_cond_init (&self->cond); @@ -306,6 +320,11 @@ gst_rtmp2_src_set_property (GObject * object, guint property_id, self->async_connect = g_value_get_boolean (value); GST_OBJECT_UNLOCK (self); break; + case PROP_IDLE_TIMEOUT: + GST_OBJECT_LOCK (self); + self->idle_timeout = g_value_get_uint (value); + GST_OBJECT_UNLOCK (self); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -393,6 +412,11 @@ gst_rtmp2_src_get_property (GObject * object, guint property_id, case PROP_STATS: g_value_take_boxed (value, gst_rtmp2_src_get_stats (self)); break; + case PROP_IDLE_TIMEOUT: + GST_OBJECT_LOCK (self); + g_value_set_uint (value, self->idle_timeout); + GST_OBJECT_UNLOCK (self); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -440,6 +464,8 @@ gst_rtmp2_src_start (GstBaseSrc * src) self->stream_id = 0; self->sent_header = FALSE; self->last_ts = GST_CLOCK_TIME_NONE; + self->timeout = FALSE; + self->started = FALSE; if (async) { gst_task_start (self->task); @@ -521,6 +547,17 @@ gst_rtmp2_src_unlock_stop (GstBaseSrc * src) return TRUE; } +static gboolean +on_timeout (GstRtmp2Src * self) +{ + g_mutex_lock (&self->lock); + self->timeout = TRUE; + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + return G_SOURCE_REMOVE; +} + static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, GstBuffer ** outbuf) @@ -529,6 +566,8 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, GstBuffer *message, *buffer; GstRtmpMeta *meta; guint32 timestamp = 0; + GSource *timeout = NULL; + GstFlowReturn ret = GST_FLOW_OK; static const guint8 flv_header_data[] = { 0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00, @@ -543,18 +582,46 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, gst_task_start (self->task); } + /* wait until GMainLoop begins running so that we can attach + * timeout source safely. + * If the task stopped meanwhile, "running" will be FALSE + * than stop_task() will wake up us as well + */ + while ((!self->started && self->running) && (!self->loop + || !g_main_loop_is_running (self->loop))) + g_cond_wait (&self->cond, &self->lock); + + GST_OBJECT_LOCK (self); + if (self->idle_timeout && self->context) { + timeout = g_timeout_source_new_seconds (self->idle_timeout); + + g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL); + g_source_attach (timeout, self->context); + } + GST_OBJECT_UNLOCK (self); + while (!self->message) { if (!self->running) { - g_mutex_unlock (&self->lock); - return GST_FLOW_EOS; + ret = GST_FLOW_EOS; + goto out; } if (self->flushing) { - g_mutex_unlock (&self->lock); - return GST_FLOW_FLUSHING; + ret = GST_FLOW_FLUSHING; + goto out; + } + if (self->timeout) { + GST_DEBUG_OBJECT (self, "Idle timeout, return EOS"); + ret = GST_FLOW_EOS; + goto out; } g_cond_wait (&self->cond, &self->lock); } + if (timeout) { + g_source_destroy (timeout); + g_source_unref (timeout); + } + message = self->message; self->message = NULL; g_cond_signal (&self->cond); @@ -615,6 +682,28 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, gst_buffer_unref (message); return GST_FLOW_OK; + +out: + g_mutex_unlock (&self->lock); + if (timeout) { + g_source_destroy (timeout); + g_source_unref (timeout); + } + + return ret; +} + +static gboolean +main_loop_running_cb (GstRtmp2Src * self) +{ + GST_TRACE_OBJECT (self, "Main loop running now"); + + g_mutex_lock (&self->lock); + self->started = TRUE; + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + return G_SOURCE_REMOVE; } /* Mainloop task */ @@ -625,6 +714,7 @@ gst_rtmp2_src_task_func (gpointer user_data) GMainContext *context; GMainLoop *loop; GTask *connector; + GSource *source; GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting"); g_mutex_lock (&self->lock); @@ -632,6 +722,13 @@ gst_rtmp2_src_task_func (gpointer user_data) context = self->context = g_main_context_new (); g_main_context_push_thread_default (context); loop = self->loop = g_main_loop_new (context, TRUE); + + source = g_idle_source_new (); + g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self, + NULL); + g_source_attach (source, self->context); + g_source_unref (source); + connector = g_task_new (self, self->cancellable, connect_task_done, NULL); g_clear_pointer (&self->stats, gst_structure_free);