From 133913a11ae35656797a7101ad0e01d484bea46b Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Wed, 15 Jan 2014 09:46:14 +0100 Subject: [PATCH] rtprtxsend: run a new GstTask on the src pad The reason behind this is to minimize the retransmission delay. Previously, when a NACK was received, rtprtxsend would put a retransmission packet in a queue and it would send it from chain(), i.e. only after a new buffer would arrive. This unfortunately was causing big delays, in the order of 60-100 ms, which can be critical for the receiver side. By having a separate GstTask for pushing buffers out of rtxsend, we can push buffers out right after receiving the event, without waiting for chain() to get called. --- gst/rtpmanager/gstrtprtxsend.c | 154 ++++++++++++++++++++++++++------- gst/rtpmanager/gstrtprtxsend.h | 5 +- 2 files changed, 125 insertions(+), 34 deletions(-) diff --git a/gst/rtpmanager/gstrtprtxsend.c b/gst/rtpmanager/gstrtprtxsend.c index d5fe267b38..e20a0be916 100644 --- a/gst/rtpmanager/gstrtprtxsend.c +++ b/gst/rtpmanager/gstrtprtxsend.c @@ -77,6 +77,9 @@ static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", GST_STATIC_CAPS ("application/x-rtp") ); +static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata); + static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event); static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, @@ -84,6 +87,10 @@ static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer); +static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx); +static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); + static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition); @@ -201,8 +208,7 @@ static void gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx) { GST_OBJECT_LOCK (rtx); - g_queue_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL); - g_queue_clear (rtx->pending); + gst_data_queue_flush (rtx->queue); g_hash_table_remove_all (rtx->ssrc_data); g_hash_table_remove_all (rtx->rtx_ssrcs); rtx->num_rtx_requests = 0; @@ -222,7 +228,7 @@ gst_rtp_rtx_send_finalize (GObject * object) g_hash_table_unref (rtx->rtx_pt_map); if (rtx->rtx_pt_map_structure) gst_structure_free (rtx->rtx_pt_map_structure); - g_queue_free_full (rtx->pending, (GDestroyNotify) gst_buffer_unref); + g_object_unref (rtx->queue); G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object); } @@ -239,6 +245,8 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); gst_pad_set_event_function (rtx->srcpad, GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event)); + gst_pad_set_activatemode_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode)); gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); rtx->sinkpad = @@ -252,7 +260,8 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain)); gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); - rtx->pending = g_queue_new (); + rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL, + NULL, rtx); rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) ssrc_rtx_data_free); rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal); @@ -262,6 +271,52 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; } +static void +gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush) +{ + GST_OBJECT_LOCK (rtx); + gst_data_queue_set_flushing (rtx->queue, flush); + gst_data_queue_flush (rtx->queue); + GST_OBJECT_UNLOCK (rtx); +} + +static gboolean +gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata) +{ + return FALSE; +} + +static void +gst_rtp_rtx_data_queue_item_free (gpointer item) +{ + GstDataQueueItem *data = item; + if (data->object) + gst_mini_object_unref (data->object); + g_slice_free (GstDataQueueItem, data); +} + +static gboolean +gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object) +{ + GstDataQueueItem *data; + gboolean success; + + data = g_slice_new0 (GstDataQueueItem); + data->object = GST_MINI_OBJECT (object); + data->size = 1; + data->duration = 1; + data->visible = TRUE; + data->destroy = gst_rtp_rtx_data_queue_item_free; + + success = gst_data_queue_push (rtx->queue, data); + + if (!success) + data->destroy (data); + + return success; +} + static guint32 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice, gboolean consider_choice) @@ -399,6 +454,7 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { guint seqnum = 0; guint ssrc = 0; + GstBuffer *rtx_buf = NULL; /* retrieve seqnum of the packet that need to be restransmisted */ if (!gst_structure_get_uint (s, "seqnum", &seqnum)) @@ -430,12 +486,14 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) if (iter) { BufferQueueItem *item = g_sequence_get (iter); GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum); - g_queue_push_tail (rtx->pending, - gst_rtp_rtx_buffer_new (rtx, item->buffer)); + rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer); } } GST_OBJECT_UNLOCK (rtx); + if (rtx_buf) + gst_rtp_rtx_send_push_out (rtx, rtx_buf); + gst_event_unref (event); res = TRUE; @@ -508,6 +566,17 @@ gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + gst_pad_push_event (rtx->srcpad, event); + gst_rtp_rtx_send_set_flushing (rtx, TRUE); + gst_pad_pause_task (rtx->srcpad); + return TRUE; + case GST_EVENT_FLUSH_STOP: + gst_pad_push_event (rtx->srcpad, event); + gst_rtp_rtx_send_set_flushing (rtx, FALSE); + gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL); + return TRUE; case GST_EVENT_CAPS: { GstCaps *caps; @@ -566,20 +635,11 @@ gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data) return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate); } -/* push pending retransmission packet. - * it constructs rtx packet from original packets */ -static void -do_push (GstBuffer * buffer, GstRtpRtxSend * rtx) -{ - gst_pad_push (rtx->srcpad, buffer); -} - static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent); GstFlowReturn ret = GST_FLOW_ERROR; - GQueue *pending = NULL; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; BufferQueueItem *item; SSRCRtxData *data; @@ -619,34 +679,64 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) } } - /* within lock, get packets that have to be retransmited */ - if (g_queue_get_length (rtx->pending) > 0) { - pending = rtx->pending; - rtx->pending = g_queue_new (); - - /* update statistics - assume we will succeed to retransmit those packets */ - rtx->num_rtx_packets += g_queue_get_length (pending); - } - - /* no need to hold the lock to push rtx packets */ GST_OBJECT_UNLOCK (rtx); - /* retransmit requested packets */ - if (pending) { - g_queue_foreach (pending, (GFunc) do_push, rtx); - g_queue_free (pending); - } - GST_LOG_OBJECT (rtx, "push seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT, seqnum, ssrc); - /* push current rtp packet */ ret = gst_pad_push (rtx->srcpad, buffer); return ret; } +static void +gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx) +{ + GstDataQueueItem *data; + + if (gst_data_queue_pop (rtx->queue, &data)) { + GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object); + + gst_pad_push (rtx->srcpad, GST_BUFFER (data->object)); + + GST_OBJECT_LOCK (rtx); + rtx->num_rtx_packets++; + GST_OBJECT_UNLOCK (rtx); + + data->object = NULL; /* we no longer own that object */ + data->destroy (data); + } else { + GST_LOG_OBJECT (rtx, "flushing"); + gst_pad_pause_task (rtx->srcpad); + } +} + +static gboolean +gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) +{ + GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent); + gboolean ret = FALSE; + + switch (mode) { + case GST_PAD_MODE_PUSH: + if (active) { + gst_rtp_rtx_send_set_flushing (rtx, FALSE); + ret = gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL); + } else { + gst_rtp_rtx_send_set_flushing (rtx, TRUE); + ret = gst_pad_stop_task (rtx->srcpad); + } + GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret); + break; + default: + break; + } + return ret; +} + static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) diff --git a/gst/rtpmanager/gstrtprtxsend.h b/gst/rtpmanager/gstrtprtxsend.h index 88f5ae9460..5f084e8e78 100644 --- a/gst/rtpmanager/gstrtprtxsend.h +++ b/gst/rtpmanager/gstrtprtxsend.h @@ -26,6 +26,7 @@ #include #include +#include G_BEGIN_DECLS #define GST_TYPE_RTP_RTX_SEND (gst_rtp_rtx_send_get_type()) @@ -45,8 +46,8 @@ struct _GstRtpRtxSend GstPad *sinkpad; GstPad *srcpad; - /* rtp packets that will be pushed upon next buffer */ - GQueue *pending; + /* rtp packets that will be pushed out */ + GstDataQueue *queue; /* ssrc -> SSRCRtxData */ GHashTable *ssrc_data;