uvcsink: add probe handling for live pipelines

Currently the uvcsink is only capable to run in an application
that is handling the state transitions of the pipeline properly
by checking on streaming event from the uvcsink.

This code is improving the element by adding an fakesink to
consume possible videostream flow in case the pipeline state
is not changing on hosts streamoff.

This is helpfull when using local gst-launch pipelines where
the streaming event is not monitored to change the pipelines
state.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1304>
This commit is contained in:
Michael Grzeschik 2023-03-21 09:24:18 +01:00 committed by GStreamer Marge Bot
parent 5d82deb2c5
commit ea2764aa69
2 changed files with 248 additions and 3 deletions

View file

@ -224,6 +224,14 @@ gst_uvc_sink_parse_cur_caps (GstUvcSink * self)
return TRUE;
}
static void gst_uvc_sink_create_buffer_peer_probe (GstUvcSink * self);
static gboolean gst_uvc_sink_to_fakesink (GstUvcSink * self);
static gboolean gst_uvc_sink_to_v4l2sink (GstUvcSink * self);
static GstPadProbeReturn gst_uvc_sink_sinkpad_event_peer_probe (GstPad * pad,
GstPadProbeInfo * info, GstUvcSink * self);
static gboolean
gst_uvc_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
@ -237,6 +245,28 @@ gst_uvc_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
GST_DEBUG_OBJECT (self, "caps %" GST_PTR_FORMAT, self->cur_caps);
gst_query_set_caps_result (query, self->cur_caps);
if (self->caps_changed) {
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK |
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
(GstPadProbeCallback) gst_uvc_sink_sinkpad_event_peer_probe,
self, NULL);
} else {
if (self->streamon) {
g_atomic_int_set (&self->streamon, FALSE);
gst_uvc_sink_to_v4l2sink (self);
if (!self->streaming)
GST_DEBUG_OBJECT (self, "something went wrong!");
}
if (self->streamoff) {
g_atomic_int_set (&self->streamoff, FALSE);
if (self->streaming)
GST_DEBUG_OBJECT (self, "something went wrong!");
}
}
return TRUE;
}
case GST_QUERY_ALLOCATION:
@ -280,6 +310,175 @@ gst_uvc_sink_class_init (GstUvcSinkClass * klass)
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
}
static gboolean
gst_uvc_sink_to_fakesink (GstUvcSink * self)
{
if (gst_pad_is_linked (self->fakesinkpad)) {
GST_DEBUG_OBJECT (self, "v4l2sink already disabled");
return FALSE;
}
GST_DEBUG_OBJECT (self, "switching to fakesink");
gst_ghost_pad_set_target (GST_GHOST_PAD (self->sinkpad), self->fakesinkpad);
gst_element_set_state (GST_ELEMENT (self->fakesink), GST_STATE_PLAYING);
/* going to state READY makes v4l2sink lose its reference to the clock */
self->v4l2_clock = gst_element_get_clock (self->v4l2sink);
gst_pad_query (self->v4l2sinkpad, gst_query_new_drain ());
gst_element_set_state (GST_ELEMENT (self->v4l2sink), GST_STATE_READY);
return TRUE;
}
static gboolean
gst_uvc_sink_to_v4l2sink (GstUvcSink * self)
{
if (gst_pad_is_linked (self->v4l2sinkpad)) {
GST_DEBUG_OBJECT (self, "fakesink already disabled");
return FALSE;
}
if (self->v4l2_clock) {
gst_element_set_clock (self->v4l2sink, self->v4l2_clock);
gst_object_unref (self->v4l2_clock);
}
GST_DEBUG_OBJECT (self, "switching to v4l2sink");
gst_ghost_pad_set_target (GST_GHOST_PAD (self->sinkpad), self->v4l2sinkpad);
gst_element_set_state (GST_ELEMENT (self->v4l2sink), GST_STATE_PLAYING);
gst_pad_query (self->fakesinkpad, gst_query_new_drain ());
gst_element_set_state (GST_ELEMENT (self->fakesink), GST_STATE_READY);
return TRUE;
}
static GstPadProbeReturn
gst_uvc_sink_sinkpad_event_peer_probe (GstPad * pad,
GstPadProbeInfo * info, GstUvcSink * self)
{
GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
GST_DEBUG_OBJECT (self, "pad is blocked now!");
if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_DATA (info)) != GST_EVENT_CAPS)
return GST_PAD_PROBE_OK;
gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GST_DEBUG_OBJECT (self, "caps %p", event);
if (self->streamon) {
g_atomic_int_set (&self->streamon, FALSE);
g_atomic_int_set (&self->caps_changed, FALSE);
gst_uvc_sink_to_v4l2sink (self);
if (!self->streaming)
GST_DEBUG_OBJECT (self, "something went wrong!");
}
if (self->streamoff) {
g_atomic_int_set (&self->streamoff, FALSE);
g_atomic_int_set (&self->caps_changed, FALSE);
if (self->streaming)
GST_DEBUG_OBJECT (self, "something went wrong!");
}
GST_DEBUG_OBJECT (self, "pad is unblocked now");
return GST_PAD_PROBE_REMOVE;
}
default:
return GST_PAD_PROBE_PASS;
}
return GST_PAD_PROBE_PASS;
}
static GstPadProbeReturn
gst_uvc_sink_sinkpad_buffer_peer_probe (GstPad * pad,
GstPadProbeInfo * info, GstUvcSink * self)
{
if (self->streamon || self->streamoff)
return GST_PAD_PROBE_DROP;
self->buffer_peer_probe_id = 0;
return GST_PAD_PROBE_REMOVE;
}
static GstPadProbeReturn
gst_uvc_sink_sinkpad_idle_probe (GstPad * pad,
GstPadProbeInfo * info, GstUvcSink * self)
{
if (self->streamon) {
gst_uvc_sink_create_buffer_peer_probe (self);
gst_pad_push_event (self->sinkpad, gst_event_new_reconfigure ());
}
if (self->streamoff) {
gst_uvc_sink_create_buffer_peer_probe (self);
gst_pad_push_event (self->sinkpad, gst_event_new_reconfigure ());
gst_uvc_sink_to_fakesink (self);
}
return GST_PAD_PROBE_PASS;
}
static void
gst_uvc_sink_create_buffer_peer_probe (GstUvcSink * self)
{
GstPad *peerpad = gst_pad_get_peer (self->sinkpad);
if (peerpad) {
self->buffer_peer_probe_id =
gst_pad_add_probe (peerpad, GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) gst_uvc_sink_sinkpad_buffer_peer_probe, self,
NULL);
gst_object_unref (peerpad);
}
}
static void
gst_uvc_sink_remove_buffer_peer_probe (GstUvcSink * self)
{
GstPad *peerpad = gst_pad_get_peer (self->sinkpad);
if (peerpad && self->buffer_peer_probe_id) {
gst_pad_remove_probe (peerpad, self->buffer_peer_probe_id);
gst_object_unref (peerpad);
self->buffer_peer_probe_id = 0;
}
}
static void
gst_uvc_sink_create_idle_probe (GstUvcSink * self)
{
GstPad *peerpad = gst_pad_get_peer (self->sinkpad);
if (peerpad) {
self->idle_probe_id =
gst_pad_add_probe (peerpad, GST_PAD_PROBE_TYPE_IDLE,
(GstPadProbeCallback) gst_uvc_sink_sinkpad_idle_probe, self, NULL);
gst_object_unref (peerpad);
}
}
static void
gst_uvc_sink_remove_idle_probe (GstUvcSink * self)
{
GstPad *peerpad = gst_pad_get_peer (self->sinkpad);
if (peerpad && self->idle_probe_id) {
gst_pad_remove_probe (peerpad, self->idle_probe_id);
gst_object_unref (peerpad);
self->idle_probe_id = 0;
}
}
static gboolean
gst_uvc_sink_open (GstUvcSink * self)
{
@ -312,15 +511,29 @@ gst_uvc_sink_init (GstUvcSink * self)
g_object_set (G_OBJECT (self->v4l2sink), "async", FALSE, NULL);
gst_bin_add (GST_BIN (self), self->v4l2sink);
/* add the fakesink element */
self->fakesink = gst_element_factory_make ("fakesink", "fakesink");
if (!self->fakesink)
return;
g_object_set (G_OBJECT (self->fakesink), "sync", TRUE, NULL);
gst_bin_add (GST_BIN (self), self->fakesink);
self->v4l2sinkpad = gst_element_get_static_pad (self->v4l2sink, "sink");
g_return_if_fail (self->v4l2sinkpad != NULL);
self->fakesinkpad = gst_element_get_static_pad (self->fakesink, "sink");
g_return_if_fail (self->fakesinkpad != NULL);
/* create ghost pad sink */
self->sinkpad = gst_ghost_pad_new ("sink", self->v4l2sinkpad);
self->sinkpad = gst_ghost_pad_new ("sink", self->fakesinkpad);
gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
g_atomic_int_set (&self->streaming, FALSE);
g_atomic_int_set (&self->streamon, FALSE);
g_atomic_int_set (&self->streamoff, FALSE);
gst_pad_set_query_function (self->sinkpad, gst_uvc_sink_query);
self->cur_caps = gst_caps_new_empty ();
@ -332,6 +545,8 @@ gst_uvc_sink_dispose (GObject * object)
GstUvcSink *self = GST_UVCSINK (object);
if (self->sinkpad) {
gst_uvc_sink_remove_buffer_peer_probe (self);
gst_pad_set_active (self->sinkpad, FALSE);
gst_element_remove_pad (GST_ELEMENT (self), self->sinkpad);
self->sinkpad = NULL;
@ -351,6 +566,12 @@ gst_uvc_sink_task (gpointer data)
struct uvc_request_data resp;
gboolean ret = TRUE;
/* Since the plugin needs to be able to start immidiatly in PLAYING
state we ensure the pipeline is not blocked while we poll for
events.
*/
GST_PAD_STREAM_UNLOCK (self->sinkpad);
ret = gst_poll_wait (self->poll, timeout);
if (G_UNLIKELY (ret < 0))
return;
@ -387,14 +608,19 @@ gst_uvc_sink_task (gpointer data)
switch (event.type) {
case UVC_EVENT_STREAMON:
GST_DEBUG_OBJECT (self, "UVC_EVENT_STREAMON");
GST_STATE_LOCK (GST_ELEMENT (self));
g_atomic_int_set (&self->streaming, TRUE);
g_atomic_int_set (&self->streamon, TRUE);
GST_STATE_UNLOCK (GST_ELEMENT (self));
g_object_notify (G_OBJECT (self), "streaming");
GST_PAD_STREAM_UNLOCK (GST_PAD (self->sinkpad));
break;
case UVC_EVENT_STREAMOFF:
case UVC_EVENT_DISCONNECT:
GST_DEBUG_OBJECT (self, "UVC_EVENT_STREAMOFF");
GST_STATE_LOCK (GST_ELEMENT (self));
g_atomic_int_set (&self->streaming, FALSE);
g_atomic_int_set (&self->streamoff, TRUE);
GST_STATE_UNLOCK (GST_ELEMENT (self));
g_object_notify (G_OBJECT (self), "streaming");
break;
case UVC_EVENT_SETUP:
@ -418,14 +644,18 @@ gst_uvc_sink_task (gpointer data)
GST_DEBUG_OBJECT (self, "UVC_EVENT_DATA");
uvc_events_process_data (self, &uvc_event->data);
if (self->control == UVC_VS_COMMIT_CONTROL) {
GstCaps *caps;
GstCaps *caps, *prev_caps;
prev_caps = gst_caps_copy (self->cur_caps);
gst_uvc_sink_parse_cur_caps (self);
caps = gst_caps_copy (self->cur_caps);
gst_clear_caps (&self->cur_caps);
self->cur_caps =
gst_caps_intersect_full (self->probed_caps, caps,
GST_CAPS_INTERSECT_FIRST);
if (!gst_caps_is_equal (self->probed_caps, prev_caps))
self->caps_changed = !gst_caps_is_equal (self->cur_caps, prev_caps);
gst_caps_unref (prev_caps);
gst_caps_unref (caps);
}
break;
@ -626,8 +856,11 @@ gst_uvc_sink_change_state (GstElement * element, GstStateChange transition)
return GST_STATE_CHANGE_FAILURE;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_uvc_sink_create_idle_probe (self);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_element_sync_state_with_parent (GST_ELEMENT (self->fakesink));
gst_uvc_sink_remove_idle_probe (self);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (!gst_uvc_sink_unwatch (self))

View file

@ -25,8 +25,10 @@ GST_ELEMENT_REGISTER_DECLARE (uvcsink);
struct _GstUvcSink
{
GstBin bin;
GstElement *fakesink;
GstElement *v4l2sink;
GstPad *sinkpad;
GstPad *fakesinkpad;
GstPad *v4l2sinkpad;
/* streaming status */
@ -51,6 +53,16 @@ struct _GstUvcSink
struct uvc_streaming_control commit;
int control;
/* probes */
int buffer_peer_probe_id;
int idle_probe_id;
GstClock *v4l2_clock;
int caps_changed;
int streamon;
int streamoff;
};
#define UVCSINK_MSG_LOCK(v) g_mutex_lock(&(v)->msg_lock)