diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c index e873d451c9..4883ee7b7e 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c @@ -109,6 +109,8 @@ G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT, GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE, GST_TYPE_RTP_RTX_SEND); +#define IS_RTX_ENABLED(rtx) (g_hash_table_size ((rtx)->rtx_pt_map) > 0) + typedef struct { guint16 seqnum; @@ -152,6 +154,60 @@ ssrc_rtx_data_free (SSRCRtxData * data) g_slice_free (SSRCRtxData, data); } +typedef enum +{ + RTX_TASK_START, + RTX_TASK_PAUSE, + RTX_TASK_STOP, +} RtxTaskState; + +static void +gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush) +{ + GST_OBJECT_LOCK (rtx); + gst_data_queue_set_flushing (rtx->queue, flush); + gst_data_queue_flush (rtx->queue); + GST_OBJECT_UNLOCK (rtx); +} + +static gboolean +gst_rtp_rtx_send_set_task_state (GstRtpRtxSend * rtx, RtxTaskState task_state) +{ + GstTask *task = GST_PAD_TASK (rtx->srcpad); + GstPadMode mode = GST_PAD_MODE (rtx->srcpad); + gboolean ret = TRUE; + + switch (task_state) { + case RTX_TASK_START: + { + gboolean active = task && GST_TASK_STATE (task) == GST_TASK_STARTED; + if (IS_RTX_ENABLED (rtx) && mode != GST_PAD_MODE_NONE && !active) { + GST_DEBUG_OBJECT (rtx, "Starting RTX task"); + gst_rtp_rtx_send_set_flushing (rtx, FALSE); + ret = gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL); + } + break; + } + case RTX_TASK_PAUSE: + if (task) { + GST_DEBUG_OBJECT (rtx, "Pausing RTX task"); + gst_rtp_rtx_send_set_flushing (rtx, TRUE); + ret = gst_pad_pause_task (rtx->srcpad); + } + break; + case RTX_TASK_STOP: + if (task) { + GST_DEBUG_OBJECT (rtx, "Stopping RTX task"); + gst_rtp_rtx_send_set_flushing (rtx, TRUE); + ret = gst_pad_stop_task (rtx->srcpad); + } + break; + } + + return ret; +} + static void gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass) { @@ -287,15 +343,6 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; } -static void -gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush) -{ - GST_OBJECT_LOCK (rtx); - gst_data_queue_set_flushing (rtx->queue, flush); - gst_data_queue_flush (rtx->queue); - GST_OBJECT_UNLOCK (rtx); -} - static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue, guint visible, guint bytes, guint64 time, gpointer checkdata) @@ -609,14 +656,11 @@ gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: gst_pad_push_event (rtx->srcpad, event); - gst_rtp_rtx_send_set_flushing (rtx, TRUE); - gst_pad_pause_task (rtx->srcpad); + gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE); return TRUE; case GST_EVENT_FLUSH_STOP: gst_pad_push_event (rtx->srcpad, event); - gst_rtp_rtx_send_set_flushing (rtx, FALSE); - gst_pad_start_task (rtx->srcpad, - (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL); + gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START); return TRUE; case GST_EVENT_EOS: GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it"); @@ -837,7 +881,7 @@ gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx) data->destroy (data); } else { GST_LOG_OBJECT (rtx, "flushing"); - gst_pad_pause_task (rtx->srcpad); + gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE); } } @@ -851,12 +895,9 @@ gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent, switch (mode) { case GST_PAD_MODE_PUSH: if (active) { - gst_rtp_rtx_send_set_flushing (rtx, FALSE); - ret = gst_pad_start_task (rtx->srcpad, - (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL); + ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START); } else { - gst_rtp_rtx_send_set_flushing (rtx, TRUE); - ret = gst_pad_stop_task (rtx->srcpad); + ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP); } GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret); break; @@ -948,6 +989,12 @@ gst_rtp_rtx_send_set_property (GObject * object, gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table, rtx->rtx_pt_map); GST_OBJECT_UNLOCK (rtx); + + if (IS_RTX_ENABLED (rtx)) + gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START); + else + gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP); + break; case PROP_MAX_SIZE_TIME: GST_OBJECT_LOCK (rtx); diff --git a/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c b/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c index 2e4c6516ea..64f43835a4 100644 --- a/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c +++ b/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c @@ -146,6 +146,141 @@ create_rtp_buffer_with_timestamp (guint32 ssrc, guint8 payload_type, return ret; } +static GstStructure * +create_rtx_map (const gchar * name, guint key, guint value) +{ + gchar *key_str = g_strdup_printf ("%u", key); + GstStructure *s = gst_structure_new (name, + key_str, G_TYPE_UINT, value, NULL); + g_free (key_str); + return s; +} + +GST_START_TEST (test_rtxsend_basic) +{ + const guint32 main_ssrc = 1234567; + const guint main_pt = 96; + const guint32 rtx_ssrc = 7654321; + const guint rtx_pt = 106; + + GstHarness *h = gst_harness_new ("rtprtxsend"); + GstStructure *ssrc_map = + create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc); + GstStructure *pt_map = + create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt); + + g_object_set (h->element, "ssrc-map", ssrc_map, NULL); + g_object_set (h->element, "payload-type-map", pt_map, NULL); + + gst_harness_set_src_caps_str (h, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); + + /* push a packet */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0))); + + /* and check it came through */ + pull_and_verify (h, FALSE, main_ssrc, main_pt, 0); + + /* now request this packet as rtx */ + gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0)); + + /* and verify we got an rtx-packet for it */ + pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 0); + + gst_structure_free (ssrc_map); + gst_structure_free (pt_map); + gst_harness_teardown (h); +} + +GST_END_TEST; + +GST_START_TEST (test_rtxsend_disabled_enabled_disabled) +{ + const guint32 main_ssrc = 1234567; + const guint main_pt = 96; + const guint32 rtx_ssrc = 7654321; + const guint rtx_pt = 106; + + GstHarness *h = gst_harness_new ("rtprtxsend"); + GstStructure *ssrc_map = + create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc); + GstStructure *pt_map = + create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt); + GstStructure *empty_pt_map = + gst_structure_new_empty ("application/x-rtp-pt-map"); + + /* set ssrc-map, but not pt-map, making the element work in passthrough */ + g_object_set (h->element, "ssrc-map", ssrc_map, NULL); + + gst_harness_set_src_caps_str (h, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); + + /* push, pull, request-rtx, verify nothing arrives */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0))); + pull_and_verify (h, FALSE, main_ssrc, main_pt, 0); + gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0)); + fail_unless_equals_int (0, gst_harness_buffers_in_queue (h)); + /* verify there is no task on the rtxsend srcpad */ + fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL); + + /* now enable rtx by setting the pt-map */ + g_object_set (h->element, "payload-type-map", pt_map, NULL); + + /* push, pull, request rtx, pull rtx */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 1))); + pull_and_verify (h, FALSE, main_ssrc, main_pt, 1); + gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 1)); + pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 1); + /* verify there is a task on the rtxsend srcpad */ + fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) != NULL); + + /* now enable disable rtx agian by setting an empty pt-map */ + g_object_set (h->element, "payload-type-map", empty_pt_map, NULL); + + /* push, pull, request-rtx, verify nothing arrives */ + fail_unless_equals_int (GST_FLOW_OK, + gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 2))); + pull_and_verify (h, FALSE, main_ssrc, main_pt, 2); + gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 2)); + fail_unless_equals_int (0, gst_harness_buffers_in_queue (h)); + /* verify the task is gone again */ + fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL); + + gst_structure_free (ssrc_map); + gst_structure_free (pt_map); + gst_structure_free (empty_pt_map); + gst_harness_teardown (h); +} + +GST_END_TEST; + +GST_START_TEST (test_rtxsend_configured_not_playing_cleans_up) +{ + GstElement *rtxsend = gst_element_factory_make ("rtprtxsend", NULL); + GstStructure *ssrc_map = + create_rtx_map ("application/x-rtp-ssrc-map", 123, 96); + GstStructure *pt_map = create_rtx_map ("application/x-rtp-pt-map", 321, 106); + + g_object_set (rtxsend, "ssrc-map", ssrc_map, NULL); + g_object_set (rtxsend, "payload-type-map", pt_map, NULL); + gst_structure_free (ssrc_map); + gst_structure_free (pt_map); + + g_usleep (G_USEC_PER_SEC); + + gst_object_unref (rtxsend); +} + +GST_END_TEST; + + GST_START_TEST (test_rtxreceive_empty_rtx_packet) { guint rtx_ssrc = 7654321; @@ -788,6 +923,10 @@ rtprtx_suite (void) suite_add_tcase (s, tc_chain); + tcase_add_test (tc_chain, test_rtxsend_basic); + tcase_add_test (tc_chain, test_rtxsend_disabled_enabled_disabled); + tcase_add_test (tc_chain, test_rtxsend_configured_not_playing_cleans_up); + tcase_add_test (tc_chain, test_rtxreceive_empty_rtx_packet); tcase_add_test (tc_chain, test_rtxsend_rtxreceive); tcase_add_test (tc_chain, test_rtxsend_rtxreceive_with_packet_loss);