mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-03-30 12:49:40 +00:00
rtpsession: add support for buffer lists on the recv path
The send path in rtpsession processes the buffer list along the way, sharing info and stats between packets in the same list, because it assumes that all packets in a buffer list are from the same frame. However, in the receiving path packets can arrive in all sorts of arrangements: - different sources, - different frames (different timestamps), - different types (multiplexed RTP and RTCP, invalid RTP packets). so a more general approach should be used to correctly support buffer lists in the receive path. It turns out that it's simpler and more robust to process buffers individually inside the rtpsession element even if they come in a buffer list, and then reassemble a new buffer list when pushing the buffers downstream. This avoids complicating the existing code to make all functions buffer-list-aware with the risk of introducing regressions, To support buffer lists in the receive path and reduce the "push overhead" in the pipeline, a new private field named processed_list is added to GstRtpSessionPrivate, it is set in the chain_list handler and used in the process_rtp callback; this is to achieve the following: - iterate over the incoming buffer list; - process the packets one by one; - add the valid ones to a new buffer list; - push the new buffer list downstream. The processed_list field is reset before pushing a buffer list to be on the safe side in case a single buffer was to be pushed by upstream at some later point. NOTE: The proposed modifications do not change the behavior of the send path. The process_rtp callback is called in rtpsource.c by the push_rtp callback (via source_push_rtp) only when the source is not internal. So even though push_rtp is also called in the send path, it won't end up using process_rtp in this case because the source would be internal in the send path. The reasoning from above may suggest a future refactoring: push_rtp might be split to better differentiate the send and receive path.
This commit is contained in:
parent
b0534c65d1
commit
8dd03042cc
1 changed files with 74 additions and 4 deletions
|
@ -276,6 +276,12 @@ struct _GstRtpSessionPrivate
|
|||
|
||||
guint recv_rtx_req_count;
|
||||
guint sent_rtx_req_count;
|
||||
|
||||
/*
|
||||
* This is the list of processed packets in the receive path when upstream
|
||||
* pushed a buffer list.
|
||||
*/
|
||||
GstBufferList *processed_list;
|
||||
};
|
||||
|
||||
/* callbacks to handle actions from the session manager */
|
||||
|
@ -301,6 +307,8 @@ static void gst_rtp_session_notify_early_rtcp (RTPSession * sess,
|
|||
gpointer user_data);
|
||||
static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad,
|
||||
GstObject * parent, GstBuffer * buffer);
|
||||
static GstFlowReturn gst_rtp_session_chain_recv_rtp_list (GstPad * pad,
|
||||
GstObject * parent, GstBufferList * list);
|
||||
static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad,
|
||||
GstObject * parent, GstBuffer * buffer);
|
||||
static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad,
|
||||
|
@ -804,6 +812,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
|
|||
"rtpsession", 0, "RTP Session");
|
||||
|
||||
GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp);
|
||||
GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp_list);
|
||||
GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp);
|
||||
GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp);
|
||||
GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list);
|
||||
|
@ -1316,8 +1325,7 @@ gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
|
|||
g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
|
||||
}
|
||||
|
||||
/* called when the session manager has an RTP packet or a list of packets
|
||||
* ready for further processing */
|
||||
/* called when the session manager has an RTP packet ready to be pushed */
|
||||
static GstFlowReturn
|
||||
gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
|
||||
GstBuffer * buffer, gpointer user_data)
|
||||
|
@ -1334,8 +1342,14 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
|
|||
GST_RTP_SESSION_UNLOCK (rtpsession);
|
||||
|
||||
if (rtp_src) {
|
||||
GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
|
||||
result = gst_pad_push (rtp_src, buffer);
|
||||
if (rtpsession->priv->processed_list) {
|
||||
GST_LOG_OBJECT (rtpsession, "queueing received RTP packet");
|
||||
gst_buffer_list_add (rtpsession->priv->processed_list, buffer);
|
||||
result = GST_FLOW_OK;
|
||||
} else {
|
||||
GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
|
||||
result = gst_pad_push (rtp_src, buffer);
|
||||
}
|
||||
gst_object_unref (rtp_src);
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
|
||||
|
@ -1971,6 +1985,60 @@ push_error:
|
|||
}
|
||||
}
|
||||
|
||||
static gboolean
|
||||
process_received_buffer_in_list (GstBuffer ** buffer, guint idx, gpointer data)
|
||||
{
|
||||
gint ret;
|
||||
|
||||
ret = gst_rtp_session_chain_recv_rtp (NULL, data, *buffer);
|
||||
if (ret != GST_FLOW_OK)
|
||||
GST_ERROR ("Processing individual buffer in a list failed");
|
||||
|
||||
/*
|
||||
* The buffer has been processed, remove it from the original list, if it was
|
||||
* a valid RTP buffer it has been added to the "processed" list in
|
||||
* gst_rtp_session_process_rtp().
|
||||
*/
|
||||
*buffer = NULL;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static GstFlowReturn
|
||||
gst_rtp_session_chain_recv_rtp_list (GstPad * pad, GstObject * parent,
|
||||
GstBufferList * list)
|
||||
{
|
||||
GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
|
||||
GstBufferList *processed_list;
|
||||
|
||||
processed_list = gst_buffer_list_new ();
|
||||
|
||||
/* Set some private data to detect that a buffer list is being pushed. */
|
||||
rtpsession->priv->processed_list = processed_list;
|
||||
|
||||
/*
|
||||
* Individually process the buffers from the incoming buffer list as the
|
||||
* incoming RTP packets in the list can be mixed in all sorts of ways:
|
||||
* - different frames,
|
||||
* - different sources,
|
||||
* - different types (RTP or RTCP)
|
||||
*/
|
||||
gst_buffer_list_foreach (list,
|
||||
(GstBufferListFunc) process_received_buffer_in_list, parent);
|
||||
|
||||
gst_buffer_list_unref (list);
|
||||
|
||||
/* Clean up private data in case the next push does not use a buffer list. */
|
||||
rtpsession->priv->processed_list = NULL;
|
||||
|
||||
if (gst_buffer_list_length (processed_list) == 0 || !rtpsession->recv_rtp_src) {
|
||||
gst_buffer_list_unref (processed_list);
|
||||
return GST_FLOW_OK;
|
||||
}
|
||||
|
||||
GST_LOG_OBJECT (rtpsession, "pushing received RTP list");
|
||||
return gst_pad_push_list (rtpsession->recv_rtp_src, processed_list);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
|
||||
GstEvent * event)
|
||||
|
@ -2372,6 +2440,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
|
|||
"recv_rtp_sink");
|
||||
gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
|
||||
gst_rtp_session_chain_recv_rtp);
|
||||
gst_pad_set_chain_list_function (rtpsession->recv_rtp_sink,
|
||||
gst_rtp_session_chain_recv_rtp_list);
|
||||
gst_pad_set_event_function (rtpsession->recv_rtp_sink,
|
||||
gst_rtp_session_event_recv_rtp_sink);
|
||||
gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
|
||||
|
|
Loading…
Reference in a new issue