rtprtxsend: don't start the task unless we are doing rtx

The rtxsend element can do pass-through when not enabled (no pt-map set)
and in those cases there is no point in starting an additional task
that does absolutely nothing.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1880>
This commit is contained in:
Havard Graff 2016-11-04 11:47:20 +01:00 committed by GStreamer Marge Bot
parent 69863131bd
commit 4d31641302
2 changed files with 206 additions and 20 deletions

View file

@ -109,6 +109,8 @@ G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT,
GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE, GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE,
GST_TYPE_RTP_RTX_SEND); GST_TYPE_RTP_RTX_SEND);
#define IS_RTX_ENABLED(rtx) (g_hash_table_size ((rtx)->rtx_pt_map) > 0)
typedef struct typedef struct
{ {
guint16 seqnum; guint16 seqnum;
@ -152,6 +154,60 @@ ssrc_rtx_data_free (SSRCRtxData * data)
g_slice_free (SSRCRtxData, data); g_slice_free (SSRCRtxData, data);
} }
typedef enum
{
RTX_TASK_START,
RTX_TASK_PAUSE,
RTX_TASK_STOP,
} RtxTaskState;
static void
gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
{
GST_OBJECT_LOCK (rtx);
gst_data_queue_set_flushing (rtx->queue, flush);
gst_data_queue_flush (rtx->queue);
GST_OBJECT_UNLOCK (rtx);
}
static gboolean
gst_rtp_rtx_send_set_task_state (GstRtpRtxSend * rtx, RtxTaskState task_state)
{
GstTask *task = GST_PAD_TASK (rtx->srcpad);
GstPadMode mode = GST_PAD_MODE (rtx->srcpad);
gboolean ret = TRUE;
switch (task_state) {
case RTX_TASK_START:
{
gboolean active = task && GST_TASK_STATE (task) == GST_TASK_STARTED;
if (IS_RTX_ENABLED (rtx) && mode != GST_PAD_MODE_NONE && !active) {
GST_DEBUG_OBJECT (rtx, "Starting RTX task");
gst_rtp_rtx_send_set_flushing (rtx, FALSE);
ret = gst_pad_start_task (rtx->srcpad,
(GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
}
break;
}
case RTX_TASK_PAUSE:
if (task) {
GST_DEBUG_OBJECT (rtx, "Pausing RTX task");
gst_rtp_rtx_send_set_flushing (rtx, TRUE);
ret = gst_pad_pause_task (rtx->srcpad);
}
break;
case RTX_TASK_STOP:
if (task) {
GST_DEBUG_OBJECT (rtx, "Stopping RTX task");
gst_rtp_rtx_send_set_flushing (rtx, TRUE);
ret = gst_pad_stop_task (rtx->srcpad);
}
break;
}
return ret;
}
static void static void
gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass) gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
{ {
@ -287,15 +343,6 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
} }
static void
gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
{
GST_OBJECT_LOCK (rtx);
gst_data_queue_set_flushing (rtx->queue, flush);
gst_data_queue_flush (rtx->queue);
GST_OBJECT_UNLOCK (rtx);
}
static gboolean static gboolean
gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue, gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
guint visible, guint bytes, guint64 time, gpointer checkdata) guint visible, guint bytes, guint64 time, gpointer checkdata)
@ -609,14 +656,11 @@ gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
gst_pad_push_event (rtx->srcpad, event); gst_pad_push_event (rtx->srcpad, event);
gst_rtp_rtx_send_set_flushing (rtx, TRUE); gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
gst_pad_pause_task (rtx->srcpad);
return TRUE; return TRUE;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
gst_pad_push_event (rtx->srcpad, event); gst_pad_push_event (rtx->srcpad, event);
gst_rtp_rtx_send_set_flushing (rtx, FALSE); gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
gst_pad_start_task (rtx->srcpad,
(GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
return TRUE; return TRUE;
case GST_EVENT_EOS: case GST_EVENT_EOS:
GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it"); GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
@ -837,7 +881,7 @@ gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
data->destroy (data); data->destroy (data);
} else { } else {
GST_LOG_OBJECT (rtx, "flushing"); GST_LOG_OBJECT (rtx, "flushing");
gst_pad_pause_task (rtx->srcpad); gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
} }
} }
@ -851,12 +895,9 @@ gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
switch (mode) { switch (mode) {
case GST_PAD_MODE_PUSH: case GST_PAD_MODE_PUSH:
if (active) { if (active) {
gst_rtp_rtx_send_set_flushing (rtx, FALSE); ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
ret = gst_pad_start_task (rtx->srcpad,
(GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
} else { } else {
gst_rtp_rtx_send_set_flushing (rtx, TRUE); ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
ret = gst_pad_stop_task (rtx->srcpad);
} }
GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret); GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
break; break;
@ -948,6 +989,12 @@ gst_rtp_rtx_send_set_property (GObject * object,
gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table, gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
rtx->rtx_pt_map); rtx->rtx_pt_map);
GST_OBJECT_UNLOCK (rtx); GST_OBJECT_UNLOCK (rtx);
if (IS_RTX_ENABLED (rtx))
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
else
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
break; break;
case PROP_MAX_SIZE_TIME: case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (rtx); GST_OBJECT_LOCK (rtx);

View file

@ -146,6 +146,141 @@ create_rtp_buffer_with_timestamp (guint32 ssrc, guint8 payload_type,
return ret; return ret;
} }
static GstStructure *
create_rtx_map (const gchar * name, guint key, guint value)
{
gchar *key_str = g_strdup_printf ("%u", key);
GstStructure *s = gst_structure_new (name,
key_str, G_TYPE_UINT, value, NULL);
g_free (key_str);
return s;
}
GST_START_TEST (test_rtxsend_basic)
{
const guint32 main_ssrc = 1234567;
const guint main_pt = 96;
const guint32 rtx_ssrc = 7654321;
const guint rtx_pt = 106;
GstHarness *h = gst_harness_new ("rtprtxsend");
GstStructure *ssrc_map =
create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc);
GstStructure *pt_map =
create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt);
g_object_set (h->element, "ssrc-map", ssrc_map, NULL);
g_object_set (h->element, "payload-type-map", pt_map, NULL);
gst_harness_set_src_caps_str (h, "application/x-rtp, "
"media = (string)video, payload = (int)96, "
"ssrc = (uint)1234567, clock-rate = (int)90000, "
"encoding-name = (string)RAW");
/* push a packet */
fail_unless_equals_int (GST_FLOW_OK,
gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0)));
/* and check it came through */
pull_and_verify (h, FALSE, main_ssrc, main_pt, 0);
/* now request this packet as rtx */
gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0));
/* and verify we got an rtx-packet for it */
pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 0);
gst_structure_free (ssrc_map);
gst_structure_free (pt_map);
gst_harness_teardown (h);
}
GST_END_TEST;
GST_START_TEST (test_rtxsend_disabled_enabled_disabled)
{
const guint32 main_ssrc = 1234567;
const guint main_pt = 96;
const guint32 rtx_ssrc = 7654321;
const guint rtx_pt = 106;
GstHarness *h = gst_harness_new ("rtprtxsend");
GstStructure *ssrc_map =
create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc);
GstStructure *pt_map =
create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt);
GstStructure *empty_pt_map =
gst_structure_new_empty ("application/x-rtp-pt-map");
/* set ssrc-map, but not pt-map, making the element work in passthrough */
g_object_set (h->element, "ssrc-map", ssrc_map, NULL);
gst_harness_set_src_caps_str (h, "application/x-rtp, "
"media = (string)video, payload = (int)96, "
"ssrc = (uint)1234567, clock-rate = (int)90000, "
"encoding-name = (string)RAW");
/* push, pull, request-rtx, verify nothing arrives */
fail_unless_equals_int (GST_FLOW_OK,
gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0)));
pull_and_verify (h, FALSE, main_ssrc, main_pt, 0);
gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0));
fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
/* verify there is no task on the rtxsend srcpad */
fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL);
/* now enable rtx by setting the pt-map */
g_object_set (h->element, "payload-type-map", pt_map, NULL);
/* push, pull, request rtx, pull rtx */
fail_unless_equals_int (GST_FLOW_OK,
gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 1)));
pull_and_verify (h, FALSE, main_ssrc, main_pt, 1);
gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 1));
pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 1);
/* verify there is a task on the rtxsend srcpad */
fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) != NULL);
/* now enable disable rtx agian by setting an empty pt-map */
g_object_set (h->element, "payload-type-map", empty_pt_map, NULL);
/* push, pull, request-rtx, verify nothing arrives */
fail_unless_equals_int (GST_FLOW_OK,
gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 2)));
pull_and_verify (h, FALSE, main_ssrc, main_pt, 2);
gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 2));
fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
/* verify the task is gone again */
fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL);
gst_structure_free (ssrc_map);
gst_structure_free (pt_map);
gst_structure_free (empty_pt_map);
gst_harness_teardown (h);
}
GST_END_TEST;
GST_START_TEST (test_rtxsend_configured_not_playing_cleans_up)
{
GstElement *rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
GstStructure *ssrc_map =
create_rtx_map ("application/x-rtp-ssrc-map", 123, 96);
GstStructure *pt_map = create_rtx_map ("application/x-rtp-pt-map", 321, 106);
g_object_set (rtxsend, "ssrc-map", ssrc_map, NULL);
g_object_set (rtxsend, "payload-type-map", pt_map, NULL);
gst_structure_free (ssrc_map);
gst_structure_free (pt_map);
g_usleep (G_USEC_PER_SEC);
gst_object_unref (rtxsend);
}
GST_END_TEST;
GST_START_TEST (test_rtxreceive_empty_rtx_packet) GST_START_TEST (test_rtxreceive_empty_rtx_packet)
{ {
guint rtx_ssrc = 7654321; guint rtx_ssrc = 7654321;
@ -788,6 +923,10 @@ rtprtx_suite (void)
suite_add_tcase (s, tc_chain); suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_rtxsend_basic);
tcase_add_test (tc_chain, test_rtxsend_disabled_enabled_disabled);
tcase_add_test (tc_chain, test_rtxsend_configured_not_playing_cleans_up);
tcase_add_test (tc_chain, test_rtxreceive_empty_rtx_packet); tcase_add_test (tc_chain, test_rtxreceive_empty_rtx_packet);
tcase_add_test (tc_chain, test_rtxsend_rtxreceive); tcase_add_test (tc_chain, test_rtxsend_rtxreceive);
tcase_add_test (tc_chain, test_rtxsend_rtxreceive_with_packet_loss); tcase_add_test (tc_chain, test_rtxsend_rtxreceive_with_packet_loss);