rtmp2src: add 'no-eof-is-error' property

There is currently no way for applications to know if the stream has
been properly terminated by the server or if the network connection
was disconnected as EOS is sent in both cases.

Adding a property so connection errors can be reported as errors
allowing applications to distinguish between both scenarios.

Fix #2828

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5115>
This commit is contained in:
Guillaume Desmottes 2023-07-27 17:14:23 +02:00 committed by GStreamer Marge Bot
parent 67ec72b7cd
commit 501e53b033
2 changed files with 53 additions and 1 deletions

View file

@ -228879,6 +228879,18 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"no-eof-is-error": {
"blurb": "If set, an error is raised if the connection is closed without receiving an EOF RTMP message first. If not set, those are reported using EOS",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
},
"stats": { "stats": {
"blurb": "Retrieve a statistics structure", "blurb": "Retrieve a statistics structure",
"conditionally-available": false, "conditionally-available": false,

View file

@ -62,6 +62,7 @@ typedef struct
gboolean async_connect; gboolean async_connect;
GstStructure *stats; GstStructure *stats;
guint idle_timeout; guint idle_timeout;
gboolean no_eof_is_error;
/* If both self->lock and OBJECT_LOCK are needed, /* If both self->lock and OBJECT_LOCK are needed,
* self->lock must be taken first */ * self->lock must be taken first */
@ -71,6 +72,8 @@ typedef struct
gboolean running, flushing; gboolean running, flushing;
gboolean timeout; gboolean timeout;
gboolean started; gboolean started;
/* TRUE if there was an error with the connection to the RTMP server */
gboolean connection_error;
GstTask *task; GstTask *task;
GRecMutex task_lock; GRecMutex task_lock;
@ -139,6 +142,7 @@ enum
PROP_ASYNC_CONNECT, PROP_ASYNC_CONNECT,
PROP_STATS, PROP_STATS,
PROP_IDLE_TIMEOUT, PROP_IDLE_TIMEOUT,
PROP_NO_EOF_IS_ERROR,
}; };
#define DEFAULT_IDLE_TIMEOUT 0 #define DEFAULT_IDLE_TIMEOUT 0
@ -218,6 +222,21 @@ gst_rtmp2_src_class_init (GstRtmp2SrcClass * klass)
0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT, 0, G_MAXUINT, DEFAULT_IDLE_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtmp2Src:no-eof-is-error:
*
* If set, an error is raised if the connection is closed without receiving an EOF RTMP message first.
" If not set, those are reported using EOS.
*
* Since: 1.24
*/
g_object_class_install_property (gobject_class, PROP_NO_EOF_IS_ERROR,
g_param_spec_boolean ("no-eof-is-error",
"No EOF is error",
"If set, an error is raised if the connection is closed without receiving an EOF RTMP message first. "
"If not set, those are reported using EOS", FALSE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0, GST_DEBUG_CATEGORY_INIT (gst_rtmp2_src_debug_category, "rtmp2src", 0,
"debug category for rtmp2src element"); "debug category for rtmp2src element");
} }
@ -330,6 +349,11 @@ gst_rtmp2_src_set_property (GObject * object, guint property_id,
self->idle_timeout = g_value_get_uint (value); self->idle_timeout = g_value_get_uint (value);
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
break; break;
case PROP_NO_EOF_IS_ERROR:
GST_OBJECT_LOCK (self);
self->no_eof_is_error = g_value_get_boolean (value);
GST_OBJECT_UNLOCK (self);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break; break;
@ -422,6 +446,11 @@ gst_rtmp2_src_get_property (GObject * object, guint property_id,
g_value_set_uint (value, self->idle_timeout); g_value_set_uint (value, self->idle_timeout);
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
break; break;
case PROP_NO_EOF_IS_ERROR:
GST_OBJECT_LOCK (self);
g_value_set_boolean (value, self->no_eof_is_error);
GST_OBJECT_UNLOCK (self);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break; break;
@ -471,6 +500,7 @@ gst_rtmp2_src_start (GstBaseSrc * src)
self->last_ts = GST_CLOCK_TIME_NONE; self->last_ts = GST_CLOCK_TIME_NONE;
self->timeout = FALSE; self->timeout = FALSE;
self->started = FALSE; self->started = FALSE;
self->connection_error = FALSE;
if (async) { if (async) {
gst_task_start (self->task); gst_task_start (self->task);
@ -607,7 +637,15 @@ gst_rtmp2_src_create (GstBaseSrc * src, guint64 offset, guint size,
while (!self->message) { while (!self->message) {
if (!self->running) { if (!self->running) {
if (self->no_eof_is_error && self->connection_error) {
GST_DEBUG_OBJECT (self,
"stopped because of connection error, return ERROR");
ret = GST_FLOW_ERROR;
} else {
GST_DEBUG_OBJECT (self, "stopped, return EOS");
ret = GST_FLOW_EOS; ret = GST_FLOW_EOS;
}
goto out; goto out;
} }
if (self->flushing) { if (self->flushing) {
@ -926,6 +964,7 @@ error_callback (GstRtmpConnection * connection, const GError * error,
} else if (self->loop) { } else if (self->loop) {
GST_INFO_OBJECT (self, "Connection error: %s %d %s", GST_INFO_OBJECT (self, "Connection error: %s %d %s",
g_quark_to_string (error->domain), error->code, error->message); g_quark_to_string (error->domain), error->code, error->message);
self->connection_error = TRUE;
stop_task (self); stop_task (self);
} }
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
@ -999,6 +1038,7 @@ connect_task_done (GObject * object, GAsyncResult * result, gpointer user_data)
G_CALLBACK (control_callback), self, 0); G_CALLBACK (control_callback), self, 0);
} else { } else {
send_connect_error (self, error); send_connect_error (self, error);
self->connection_error = TRUE;
stop_task (self); stop_task (self);
g_error_free (error); g_error_free (error);
} }