rtpbin: add request-jitterbuffer signal

This can be used to pass the threadsharing jitterbuffer from
gst-plugins-rs for example.
This commit is contained in:
Mathieu Duponchelle 2017-12-19 18:23:16 +01:00 committed by Sebastian Dröge
parent 5ffd733317
commit b5e414cdc2
2 changed files with 134 additions and 43 deletions

View file

@ -85,8 +85,12 @@
* An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal * An AUX receiver has 1 src_\%u pad that much match the sessionid in the signal
* and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad * and 1 or more sink_\%u pads. A session will be made for each sink_\%u pad
* when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin. * when the corresponding recv_rtp_sink_\%u pad is requested on #GstRtpBin.
* The #GstRtpBin::request-jitterbuffer signal can be used to provide a custom
* element to perform arrival time smoothing, reordering and optionally packet
* loss detection and retransmission requests.
* *
* ## Example pipelines * ## Example pipelines
*
* |[ * |[
* gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \ * gst-launch-1.0 udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
* rtpbin ! rtptheoradepay ! theoradec ! xvimagesink * rtpbin ! rtptheoradepay ! theoradec ! xvimagesink
@ -274,6 +278,8 @@ enum
SIGNAL_REQUEST_FEC_DECODER, SIGNAL_REQUEST_FEC_DECODER,
SIGNAL_REQUEST_FEC_ENCODER, SIGNAL_REQUEST_FEC_ENCODER,
SIGNAL_REQUEST_JITTERBUFFER,
SIGNAL_NEW_JITTERBUFFER, SIGNAL_NEW_JITTERBUFFER,
SIGNAL_NEW_STORAGE, SIGNAL_NEW_STORAGE,
@ -383,6 +389,8 @@ complete_session_receiver (GstRtpBin * rtpbin, GstRtpBinSession * session,
guint sessid); guint sessid);
static GstPad *complete_session_rtcp (GstRtpBin * rtpbin, static GstPad *complete_session_rtcp (GstRtpBin * rtpbin,
GstRtpBinSession * session, guint sessid); GstRtpBinSession * session, guint sessid);
static GstElement *session_request_element (GstRtpBinSession * session,
guint signal);
/* Manages the RTP stream for one SSRC. /* Manages the RTP stream for one SSRC.
* *
@ -990,7 +998,8 @@ gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
GstRtpBinStream *stream = (GstRtpBinStream *) streams->data; GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
GST_DEBUG_OBJECT (bin, "clearing stream %p", stream); GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL); if (g_signal_lookup ("clear-pt-map", G_OBJECT_TYPE (stream->buffer)) != 0)
g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
if (stream->demux) if (stream->demux)
g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL); g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
} }
@ -1089,6 +1098,12 @@ gst_rtp_bin_request_decoder (GstRtpBin * bin, guint session_id)
return NULL; return NULL;
} }
static GstElement *
gst_rtp_bin_request_jitterbuffer (GstRtpBin * bin, guint session_id)
{
return gst_element_factory_make ("rtpjitterbuffer", NULL);
}
static void static void
gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin, gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
const gchar * name, const GValue * value) const gchar * name, const GValue * value)
@ -1102,8 +1117,14 @@ gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
GST_RTP_SESSION_LOCK (session); GST_RTP_SESSION_LOCK (session);
for (streams = session->streams; streams; streams = g_slist_next (streams)) { for (streams = session->streams; streams; streams = g_slist_next (streams)) {
GstRtpBinStream *stream = (GstRtpBinStream *) streams->data; GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
GObjectClass *jb_class;
g_object_set_property (G_OBJECT (stream->buffer), name, value); jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
if (g_object_class_find_property (jb_class, name))
g_object_set_property (G_OBJECT (stream->buffer), name, value);
else
GST_WARNING_OBJECT (bin,
"Stream jitterbuffer does not expose property %s", name);
} }
GST_RTP_SESSION_UNLOCK (session); GST_RTP_SESSION_UNLOCK (session);
} }
@ -1235,6 +1256,15 @@ stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
gboolean allow_positive_ts_offset) gboolean allow_positive_ts_offset)
{ {
gint64 prev_ts_offset; gint64 prev_ts_offset;
GObjectClass *jb_class;
jb_class = G_OBJECT_GET_CLASS (G_OBJECT (stream->buffer));
if (!g_object_class_find_property (jb_class, "ts-offset")) {
GST_LOG_OBJECT (bin,
"stream's jitterbuffer does not expose ts-offset property");
return;
}
g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL); g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
@ -1693,13 +1723,15 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
GstRtpBinStream *stream; GstRtpBinStream *stream;
GstRtpBin *rtpbin; GstRtpBin *rtpbin;
GstState target; GstState target;
GObjectClass *jb_class;
rtpbin = session->bin; rtpbin = session->bin;
if (g_slist_length (session->streams) >= rtpbin->max_streams) if (g_slist_length (session->streams) >= rtpbin->max_streams)
goto max_streams; goto max_streams;
if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL))) if (!(buffer =
session_request_element (session, SIGNAL_REQUEST_JITTERBUFFER)))
goto no_jitterbuffer; goto no_jitterbuffer;
if (!rtpbin->ignore_pt) { if (!rtpbin->ignore_pt) {
@ -1711,7 +1743,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
stream->ssrc = ssrc; stream->ssrc = ssrc;
stream->bin = rtpbin; stream->bin = rtpbin;
stream->session = session; stream->session = session;
stream->buffer = buffer; stream->buffer = gst_object_ref (buffer);
stream->demux = demux; stream->demux = demux;
stream->have_sync = FALSE; stream->have_sync = FALSE;
@ -1721,28 +1753,44 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
stream->clock_base = -100 * GST_SECOND; stream->clock_base = -100 * GST_SECOND;
session->streams = g_slist_prepend (session->streams, stream); session->streams = g_slist_prepend (session->streams, stream);
/* provide clock_rate to the jitterbuffer when needed */ jb_class = G_OBJECT_GET_CLASS (G_OBJECT (buffer));
stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
(GCallback) pt_map_requested, session); if (g_signal_lookup ("request-pt-map", G_OBJECT_TYPE (buffer)) != 0) {
stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop", /* provide clock_rate to the jitterbuffer when needed */
(GCallback) on_npt_stop, stream); stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
(GCallback) pt_map_requested, session);
}
if (g_signal_lookup ("on-npt-stop", G_OBJECT_TYPE (buffer)) != 0) {
stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
(GCallback) on_npt_stop, stream);
}
g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session); g_object_set_data (G_OBJECT (buffer), "GstRTPBin.session", session);
g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream); g_object_set_data (G_OBJECT (buffer), "GstRTPBin.stream", stream);
/* configure latency and packet lost */ /* configure latency and packet lost */
g_object_set (buffer, "latency", rtpbin->latency_ms, NULL); g_object_set (buffer, "latency", rtpbin->latency_ms, NULL);
g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL); if (g_object_class_find_property (jb_class, "drop-on-latency"))
g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL); g_object_set (buffer, "drop-on-latency", rtpbin->drop_on_latency, NULL);
g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL); if (g_object_class_find_property (jb_class, "do-lost"))
g_object_set (buffer, "max-rtcp-rtp-time-diff", g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
rtpbin->max_rtcp_rtp_time_diff, NULL); if (g_object_class_find_property (jb_class, "mode"))
g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, g_object_set (buffer, "mode", rtpbin->buffer_mode, NULL);
"max-misorder-time", rtpbin->max_misorder_time, NULL); if (g_object_class_find_property (jb_class, "do-retransmission"))
g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL); g_object_set (buffer, "do-retransmission", rtpbin->do_retransmission, NULL);
g_object_set (buffer, "max-ts-offset-adjustment", if (g_object_class_find_property (jb_class, "max-rtcp-rtp-time-diff"))
rtpbin->max_ts_offset_adjustment, NULL); g_object_set (buffer, "max-rtcp-rtp-time-diff",
rtpbin->max_rtcp_rtp_time_diff, NULL);
if (g_object_class_find_property (jb_class, "max-dropout-time"))
g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, NULL);
if (g_object_class_find_property (jb_class, "max-misorder-time"))
g_object_set (buffer, "max-dropout-time", rtpbin->max_misorder_time, NULL);
if (g_object_class_find_property (jb_class, "rfc7273-sync"))
g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
if (g_object_class_find_property (jb_class, "max-ts-offset-adjustment"))
g_object_set (buffer, "max-ts-offset-adjustment",
rtpbin->max_ts_offset_adjustment, NULL);
/* need to sink the jitterbufer or otherwise signal handlers from bindings will /* need to sink the jitterbufer or otherwise signal handlers from bindings will
* take ownership of it and we don't own it anymore */ * take ownership of it and we don't own it anymore */
@ -1752,7 +1800,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
if (!rtpbin->ignore_pt) if (!rtpbin->ignore_pt)
gst_bin_add (GST_BIN_CAST (rtpbin), demux); gst_bin_add (GST_BIN_CAST (rtpbin), demux);
gst_bin_add (GST_BIN_CAST (rtpbin), buffer);
/* unref the jitterbuffer again, the bin has a reference now and /* unref the jitterbuffer again, the bin has a reference now and
* we don't need it anymore */ * we don't need it anymore */
@ -1766,9 +1813,12 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
if (rtpbin->buffering) { if (rtpbin->buffering) {
guint64 last_out; guint64 last_out;
GST_INFO_OBJECT (rtpbin, if (g_signal_lookup ("set-active", G_OBJECT_TYPE (buffer)) != 0) {
"bin is buffering, set jitterbuffer as not active"); GST_INFO_OBJECT (rtpbin,
g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0, &last_out); "bin is buffering, set jitterbuffer as not active");
g_signal_emit_by_name (buffer, "set-active", FALSE, (gint64) 0,
&last_out);
}
} }
@ -1817,24 +1867,27 @@ free_stream (GstRtpBinStream * stream, GstRtpBin * bin)
g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig); g_signal_handler_disconnect (stream->demux, stream->demux_ptreq_sig);
g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig); g_signal_handler_disconnect (stream->demux, stream->demux_ptchange_sig);
} }
g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig); if (stream->buffer_handlesync_sig)
g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig); g_signal_handler_disconnect (stream->buffer, stream->buffer_handlesync_sig);
if (stream->buffer_ptreq_sig)
g_signal_handler_disconnect (stream->buffer, stream->buffer_ptreq_sig);
if (stream->buffer_ntpstop_sig)
g_signal_handler_disconnect (stream->buffer, stream->buffer_ntpstop_sig);
gst_object_unref (stream->buffer);
if (stream->demux) if (stream->demux)
gst_element_set_locked_state (stream->demux, TRUE); gst_element_set_locked_state (stream->demux, TRUE);
gst_element_set_locked_state (stream->buffer, TRUE);
if (stream->demux) if (stream->demux)
gst_element_set_state (stream->demux, GST_STATE_NULL); gst_element_set_state (stream->demux, GST_STATE_NULL);
gst_element_set_state (stream->buffer, GST_STATE_NULL);
/* now remove this signal, we need this while going to NULL because it to /* now remove this signal, we need this while going to NULL because it to
* do some cleanups */ * do some cleanups */
if (stream->demux) if (stream->demux)
g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig); g_signal_handler_disconnect (stream->demux, stream->demux_padremoved_sig);
gst_bin_remove (GST_BIN_CAST (bin), stream->buffer);
if (stream->demux) if (stream->demux)
gst_bin_remove (GST_BIN_CAST (bin), stream->demux); gst_bin_remove (GST_BIN_CAST (bin), stream->demux);
@ -2263,6 +2316,34 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
request_rtcp_decoder), _gst_element_accumulator, NULL, request_rtcp_decoder), _gst_element_accumulator, NULL,
g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT); g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
/**
* GstRtpBin::request-jitterbuffer:
* @rtpbin: the object which received the signal
* @session: the session
*
* Request a jitterbuffer element for the given @session.
*
* If no handler is connected, the default jitterbuffer will be used.
*
* Note: The provided element is expected to conform to the API exposed
* by the standard #GstRtpJitterBuffer. Runtime checks will be made to
* determine whether it exposes properties and signals before attempting
* to set, call or connect to them, and some functionalities of #GstRtpBin
* may not be available when that is not the case.
*
* This should be considered experimental API, as the standard jitterbuffer
* API is susceptible to change, provided elements will have to update their
* custom jitterbuffer's API to match the API of #GstRtpJitterBuffer if and
* when it changes.
*
* Since: 1.18
*/
gst_rtp_bin_signals[SIGNAL_REQUEST_JITTERBUFFER] =
g_signal_new ("request-jitterbuffer", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass,
request_jitterbuffer), _gst_element_accumulator, NULL,
g_cclosure_marshal_generic, GST_TYPE_ELEMENT, 1, G_TYPE_UINT);
/** /**
* GstRtpBin::new-jitterbuffer: * GstRtpBin::new-jitterbuffer:
* @rtpbin: the object which received the signal * @rtpbin: the object which received the signal
@ -2622,6 +2703,8 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder); klass->request_rtp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder); klass->request_rtcp_encoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_encoder);
klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder); klass->request_rtcp_decoder = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_decoder);
klass->request_jitterbuffer =
GST_DEBUG_FUNCPTR (gst_rtp_bin_request_jitterbuffer);
GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin"); GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
} }
@ -3157,8 +3240,10 @@ gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
GstElement *element = stream->buffer; GstElement *element = stream->buffer;
guint64 last_out; guint64 last_out;
g_signal_emit_by_name (element, "set-active", active, offset, if (g_signal_lookup ("set-active", G_OBJECT_TYPE (element)) != 0) {
&last_out); g_signal_emit_by_name (element, "set-active", active, offset,
&last_out);
}
if (!active) { if (!active) {
g_object_get (element, "percent", &stream->percent, NULL); g_object_get (element, "percent", &stream->percent, NULL);
@ -3535,19 +3620,23 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
gst_object_unref (sinkpad); gst_object_unref (sinkpad);
gst_object_unref (srcpad); gst_object_unref (srcpad);
GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
padname = g_strdup_printf ("rtcp_src_%u", ssrc);
srcpad = gst_element_get_static_pad (element, padname);
g_free (padname);
sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp"); sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING); if (sinkpad) {
gst_object_unref (sinkpad); GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
gst_object_unref (srcpad); padname = g_strdup_printf ("rtcp_src_%u", ssrc);
srcpad = gst_element_get_static_pad (element, padname);
g_free (padname);
gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING);
gst_object_unref (sinkpad);
gst_object_unref (srcpad);
}
/* connect to the RTCP sync signal from the jitterbuffer */ if (g_signal_lookup ("handle-sync", G_OBJECT_TYPE (stream->buffer)) != 0) {
GST_DEBUG_OBJECT (rtpbin, "connecting sync signal"); /* connect to the RTCP sync signal from the jitterbuffer */
stream->buffer_handlesync_sig = g_signal_connect (stream->buffer, GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
"handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream); stream->buffer_handlesync_sig = g_signal_connect (stream->buffer,
"handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
}
if (stream->demux) { if (stream->demux) {
/* connect to the new-pad signal of the payload demuxer, this will expose the /* connect to the new-pad signal of the payload demuxer, this will expose the

View file

@ -135,6 +135,8 @@ struct _GstRtpBinClass {
GstElement* (*request_fec_encoder) (GstRtpBin *rtpbin, guint session); GstElement* (*request_fec_encoder) (GstRtpBin *rtpbin, guint session);
GstElement* (*request_fec_decoder) (GstRtpBin *rtpbin, guint session); GstElement* (*request_fec_decoder) (GstRtpBin *rtpbin, guint session);
GstElement* (*request_jitterbuffer) (GstRtpBin *rtpbin, guint session);
void (*on_new_sender_ssrc) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_new_sender_ssrc) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
void (*on_sender_ssrc_active) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_sender_ssrc_active) (GstRtpBin *rtpbin, guint session, guint32 ssrc);
}; };