diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 3440a6a099..8f6c3cfc65 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -276,6 +276,12 @@ struct _GstRtpSessionPrivate guint recv_rtx_req_count; guint sent_rtx_req_count; + + /* + * This is the list of processed packets in the receive path when upstream + * pushed a buffer list. + */ + GstBufferList *processed_list; }; /* callbacks to handle actions from the session manager */ @@ -301,6 +307,8 @@ static void gst_rtp_session_notify_early_rtcp (RTPSession * sess, gpointer user_data); static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad, GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_rtp_session_chain_recv_rtp_list (GstPad * pad, + GstObject * parent, GstBufferList * list); static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstObject * parent, GstBuffer * buffer); static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad, @@ -804,6 +812,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) "rtpsession", 0, "RTP Session"); GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp); + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtp_list); GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_recv_rtcp); GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp); GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_session_chain_send_rtp_list); @@ -1316,8 +1325,7 @@ gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession) g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL); } -/* called when the session manager has an RTP packet or a list of packets - * ready for further processing */ +/* called when the session manager has an RTP packet ready to be pushed */ static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data) @@ -1334,8 +1342,14 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, GST_RTP_SESSION_UNLOCK (rtpsession); if (rtp_src) { - GST_LOG_OBJECT (rtpsession, "pushing received RTP packet"); - result = gst_pad_push (rtp_src, buffer); + if (rtpsession->priv->processed_list) { + GST_LOG_OBJECT (rtpsession, "queueing received RTP packet"); + gst_buffer_list_add (rtpsession->priv->processed_list, buffer); + result = GST_FLOW_OK; + } else { + GST_LOG_OBJECT (rtpsession, "pushing received RTP packet"); + result = gst_pad_push (rtp_src, buffer); + } gst_object_unref (rtp_src); } else { GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet"); @@ -1971,6 +1985,60 @@ push_error: } } +static gboolean +process_received_buffer_in_list (GstBuffer ** buffer, guint idx, gpointer data) +{ + gint ret; + + ret = gst_rtp_session_chain_recv_rtp (NULL, data, *buffer); + if (ret != GST_FLOW_OK) + GST_ERROR ("Processing individual buffer in a list failed"); + + /* + * The buffer has been processed, remove it from the original list, if it was + * a valid RTP buffer it has been added to the "processed" list in + * gst_rtp_session_process_rtp(). + */ + *buffer = NULL; + return TRUE; +} + +static GstFlowReturn +gst_rtp_session_chain_recv_rtp_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstRtpSession *rtpsession = GST_RTP_SESSION (parent); + GstBufferList *processed_list; + + processed_list = gst_buffer_list_new (); + + /* Set some private data to detect that a buffer list is being pushed. */ + rtpsession->priv->processed_list = processed_list; + + /* + * Individually process the buffers from the incoming buffer list as the + * incoming RTP packets in the list can be mixed in all sorts of ways: + * - different frames, + * - different sources, + * - different types (RTP or RTCP) + */ + gst_buffer_list_foreach (list, + (GstBufferListFunc) process_received_buffer_in_list, parent); + + gst_buffer_list_unref (list); + + /* Clean up private data in case the next push does not use a buffer list. */ + rtpsession->priv->processed_list = NULL; + + if (gst_buffer_list_length (processed_list) == 0 || !rtpsession->recv_rtp_src) { + gst_buffer_list_unref (processed_list); + return GST_FLOW_OK; + } + + GST_LOG_OBJECT (rtpsession, "pushing received RTP list"); + return gst_pad_push_list (rtpsession->recv_rtp_src, processed_list); +} + static gboolean gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent, GstEvent * event) @@ -2372,6 +2440,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) "recv_rtp_sink"); gst_pad_set_chain_function (rtpsession->recv_rtp_sink, gst_rtp_session_chain_recv_rtp); + gst_pad_set_chain_list_function (rtpsession->recv_rtp_sink, + gst_rtp_session_chain_recv_rtp_list); gst_pad_set_event_function (rtpsession->recv_rtp_sink, gst_rtp_session_event_recv_rtp_sink); gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,