diff --git a/gst/rtmp2/gstrtmp2src.c b/gst/rtmp2/gstrtmp2src.c index 4ed4f98dbf..dff4eac81a 100644 --- a/gst/rtmp2/gstrtmp2src.c +++ b/gst/rtmp2/gstrtmp2src.c @@ -60,6 +60,7 @@ typedef struct GstRtmpLocation location; gboolean async_connect; GstStructure *stats; + guint idle_timeout; /* If both self->lock and OBJECT_LOCK are needed, * self->lock must be taken first */ @@ -67,6 +68,8 @@ typedef struct GCond cond; gboolean running, flushing; + gboolean timeout; + gboolean started; GstTask *task; GRecMutex task_lock; @@ -133,8 +136,11 @@ enum PROP_FLASH_VERSION, PROP_ASYNC_CONNECT, PROP_STATS, + PROP_IDLE_TIMEOUT, }; +#define DEFAULT_IDLE_TIMEOUT 0 + /* pad templates */ static GstStaticPadTemplate gst_rtmp2_src_src_template = @@ -200,6 +206,13 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass) g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT, + g_param_spec_uint ("idle-timeout", "Idle timeout", + "The maximum allowed time in seconds for valid packets not to arrive " + "from the peer (0 = no timeout)", + 0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, "debug category for rtmp2src element"); } @@ -208,6 +221,7 @@ static void gst_rtmp2_src_init (GstRtmp2Src * self) { self->async_connect = TRUE; + self->idle_timeout = DEFAULT_IDLE_TIMEOUT; g_mutex_init (&self->lock); g_cond_init (&self->cond); @@ -306,6 +320,11 @@ gst_rtmp2_src_set_property (GObject * object, guint property_id, self->async_connect = g_value_get_boolean (value); GST_OBJECT_UNLOCK (self); break; + case PROP_IDLE_TIMEOUT: + GST_OBJECT_LOCK (self); + self->idle_timeout = g_value_get_uint (value); + GST_OBJECT_UNLOCK (self); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -393,6 +412,11 @@ gst_rtmp2_src_get_property (GObject * object, guint property_id, case PROP_STATS: g_value_take_boxed (value, gst_rtmp2_src_get_stats (self)); break; + case PROP_IDLE_TIMEOUT: + GST_OBJECT_LOCK (self); + g_value_set_uint (value, self->idle_timeout); + GST_OBJECT_UNLOCK (self); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; @@ -440,6 +464,8 @@ gst_rtmp2_src_start (GstBaseSrc * src) self->stream_id = 0; self->sent_header = FALSE; self->last_ts = GST_CLOCK_TIME_NONE; + self->timeout = FALSE; + self->started = FALSE; if (async) { gst_task_start (self->task); @@ -521,6 +547,17 @@ gst_rtmp2_src_unlock_stop (GstBaseSrc * src) return TRUE; } +static gboolean +on_timeout (GstRtmp2Src * self) +{ + g_mutex_lock (&self->lock); + self->timeout = TRUE; + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + return G_SOURCE_REMOVE; +} + static GstFlowReturn gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, GstBuffer ** outbuf) @@ -529,6 +566,8 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, GstBuffer *message, *buffer; GstRtmpMeta *meta; guint32 timestamp = 0; + GSource *timeout = NULL; + GstFlowReturn ret = GST_FLOW_OK; static const guint8 flv_header_data[] = { 0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00, @@ -543,18 +582,46 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, gst_task_start (self->task); } + /* wait until GMainLoop begins running so that we can attach + * timeout source safely. + * If the task stopped meanwhile, "running" will be FALSE + * than stop_task() will wake up us as well + */ + while ((!self->started && self->running) && (!self->loop + || !g_main_loop_is_running (self->loop))) + g_cond_wait (&self->cond, &self->lock); + + GST_OBJECT_LOCK (self); + if (self->idle_timeout && self->context) { + timeout = g_timeout_source_new_seconds (self->idle_timeout); + + g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL); + g_source_attach (timeout, self->context); + } + GST_OBJECT_UNLOCK (self); + while (!self->message) { if (!self->running) { - g_mutex_unlock (&self->lock); - return GST_FLOW_EOS; + ret = GST_FLOW_EOS; + goto out; } if (self->flushing) { - g_mutex_unlock (&self->lock); - return GST_FLOW_FLUSHING; + ret = GST_FLOW_FLUSHING; + goto out; + } + if (self->timeout) { + GST_DEBUG_OBJECT (self, "Idle timeout, return EOS"); + ret = GST_FLOW_EOS; + goto out; } g_cond_wait (&self->cond, &self->lock); } + if (timeout) { + g_source_destroy (timeout); + g_source_unref (timeout); + } + message = self->message; self->message = NULL; g_cond_signal (&self->cond); @@ -615,6 +682,28 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size, gst_buffer_unref (message); return GST_FLOW_OK; + +out: + g_mutex_unlock (&self->lock); + if (timeout) { + g_source_destroy (timeout); + g_source_unref (timeout); + } + + return ret; +} + +static gboolean +main_loop_running_cb (GstRtmp2Src * self) +{ + GST_TRACE_OBJECT (self, "Main loop running now"); + + g_mutex_lock (&self->lock); + self->started = TRUE; + g_cond_broadcast (&self->cond); + g_mutex_unlock (&self->lock); + + return G_SOURCE_REMOVE; } /* Mainloop task */ @@ -625,6 +714,7 @@ gst_rtmp2_src_task_func (gpointer user_data) GMainContext *context; GMainLoop *loop; GTask *connector; + GSource *source; GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting"); g_mutex_lock (&self->lock); @@ -632,6 +722,13 @@ gst_rtmp2_src_task_func (gpointer user_data) context = self->context = g_main_context_new (); g_main_context_push_thread_default (context); loop = self->loop = g_main_loop_new (context, TRUE); + + source = g_idle_source_new (); + g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self, + NULL); + g_source_attach (source, self->context); + g_source_unref (source); + connector = g_task_new (self, self->cancellable, connect_task_done, NULL); g_clear_pointer (&self->stats, gst_structure_free);