rtmp2src: Add idle-timeout property

Add new property to signalling that there is no incoming data
from peer. This can be useful if users want to stop the streaming
when the connection is alive but no packet is arriving.
This commit is contained in:
Seungha Yang 2020-03-27 15:40:00 +09:00 committed by GStreamer Merge Bot
parent 8da177c0bf
commit 7f5347a664

View file

@ -60,6 +60,7 @@ typedef struct
GstRtmpLocation location;
gboolean async_connect;
GstStructure *stats;
guint idle_timeout;
/* If both self->lock and OBJECT_LOCK are needed,
* self->lock must be taken first */
@ -67,6 +68,8 @@ typedef struct
GCond cond;
gboolean running, flushing;
gboolean timeout;
gboolean started;
GstTask *task;
GRecMutex task_lock;
@ -133,8 +136,11 @@ enum
PROP_FLASH_VERSION,
PROP_ASYNC_CONNECT,
PROP_STATS,
PROP_IDLE_TIMEOUT,
};
#define DEFAULT_IDLE_TIMEOUT 0
/* pad templates */
static GstStaticPadTemplate gst_rtmp2_src_src_template =
@ -200,6 +206,13 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_IDLE_TIMEOUT,
g_param_spec_uint ("idle-timeout", "Idle timeout",
"The maximum allowed time in seconds for valid packets not to arrive "
"from the peer (0 = no timeout)",
0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
"debug category for rtmp2src element");
}
@ -208,6 +221,7 @@ static void
gst_rtmp2_src_init (GstRtmp2Src * self)
{
self->async_connect = TRUE;
self->idle_timeout = DEFAULT_IDLE_TIMEOUT;
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
@ -306,6 +320,11 @@ gst_rtmp2_src_set_property (GObject * object, guint property_id,
self->async_connect = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (self);
break;
case PROP_IDLE_TIMEOUT:
GST_OBJECT_LOCK (self);
self->idle_timeout = g_value_get_uint (value);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@ -393,6 +412,11 @@ gst_rtmp2_src_get_property (GObject * object, guint property_id,
case PROP_STATS:
g_value_take_boxed (value, gst_rtmp2_src_get_stats (self));
break;
case PROP_IDLE_TIMEOUT:
GST_OBJECT_LOCK (self);
g_value_set_uint (value, self->idle_timeout);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@ -440,6 +464,8 @@ gst_rtmp2_src_start (GstBaseSrc * src)
self->stream_id = 0;
self->sent_header = FALSE;
self->last_ts = GST_CLOCK_TIME_NONE;
self->timeout = FALSE;
self->started = FALSE;
if (async) {
gst_task_start (self->task);
@ -521,6 +547,17 @@ gst_rtmp2_src_unlock_stop (GstBaseSrc * src)
return TRUE;
}
static gboolean
on_timeout (GstRtmp2Src * self)
{
g_mutex_lock (&self->lock);
self->timeout = TRUE;
g_cond_broadcast (&self->cond);
g_mutex_unlock (&self->lock);
return G_SOURCE_REMOVE;
}
static GstFlowReturn
gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
GstBuffer ** outbuf)
@ -529,6 +566,8 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
GstBuffer *message, *buffer;
GstRtmpMeta *meta;
guint32 timestamp = 0;
GSource *timeout = NULL;
GstFlowReturn ret = GST_FLOW_OK;
static const guint8 flv_header_data[] = {
0x46, 0x4c, 0x56, 0x01, 0x01, 0x00, 0x00, 0x00,
@ -543,18 +582,46 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
gst_task_start (self->task);
}
/* wait until GMainLoop begins running so that we can attach
* timeout source safely.
* If the task stopped meanwhile, "running" will be FALSE
* than stop_task() will wake up us as well
*/
while ((!self->started && self->running) && (!self->loop
|| !g_main_loop_is_running (self->loop)))
g_cond_wait (&self->cond, &self->lock);
GST_OBJECT_LOCK (self);
if (self->idle_timeout && self->context) {
timeout = g_timeout_source_new_seconds (self->idle_timeout);
g_source_set_callback (timeout, (GSourceFunc) on_timeout, self, NULL);
g_source_attach (timeout, self->context);
}
GST_OBJECT_UNLOCK (self);
while (!self->message) {
if (!self->running) {
g_mutex_unlock (&self->lock);
return GST_FLOW_EOS;
ret = GST_FLOW_EOS;
goto out;
}
if (self->flushing) {
g_mutex_unlock (&self->lock);
return GST_FLOW_FLUSHING;
ret = GST_FLOW_FLUSHING;
goto out;
}
if (self->timeout) {
GST_DEBUG_OBJECT (self, "Idle timeout, return EOS");
ret = GST_FLOW_EOS;
goto out;
}
g_cond_wait (&self->cond, &self->lock);
}
if (timeout) {
g_source_destroy (timeout);
g_source_unref (timeout);
}
message = self->message;
self->message = NULL;
g_cond_signal (&self->cond);
@ -615,6 +682,28 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
gst_buffer_unref (message);
return GST_FLOW_OK;
out:
g_mutex_unlock (&self->lock);
if (timeout) {
g_source_destroy (timeout);
g_source_unref (timeout);
}
return ret;
}
static gboolean
main_loop_running_cb (GstRtmp2Src * self)
{
GST_TRACE_OBJECT (self, "Main loop running now");
g_mutex_lock (&self->lock);
self->started = TRUE;
g_cond_broadcast (&self->cond);
g_mutex_unlock (&self->lock);
return G_SOURCE_REMOVE;
}
/* Mainloop task */
@ -625,6 +714,7 @@ gst_rtmp2_src_task_func (gpointer user_data)
GMainContext *context;
GMainLoop *loop;
GTask *connector;
GSource *source;
GST_DEBUG_OBJECT (self, "gst_rtmp2_src_task starting");
g_mutex_lock (&self->lock);
@ -632,6 +722,13 @@ gst_rtmp2_src_task_func (gpointer user_data)
context = self->context = g_main_context_new ();
g_main_context_push_thread_default (context);
loop = self->loop = g_main_loop_new (context, TRUE);
source = g_idle_source_new ();
g_source_set_callback (source, (GSourceFunc) main_loop_running_cb, self,
NULL);
g_source_attach (source, self->context);
g_source_unref (source);
connector = g_task_new (self, self->cancellable, connect_task_done, NULL);
g_clear_pointer (&self->stats, gst_structure_free);