rtprtxsend: fix data locking when creating rtx packets

This patch moves the creation of rtx packets to be done early,
in the src_event() function, when they are requested. The purpose
is to run gst_rtp_rtx_buffer_new() with the object locked to
protect internal data, because if it is done at the pushing stage,
we would have to lock and unlock multiple times in a row while we
are pushing the rtx buffers.

Previously there was no locking at all, which was terribly wrong.
This commit is contained in:
George Kiagiadakis 2014-01-14 13:01:41 +01:00
parent 3d9ca102c9
commit 55746eaa4c

View file

@ -304,6 +304,81 @@ gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
return data;
}
/* Copy fixed header and extension. Add OSN before to copy payload
* Copy memory to avoid to manually copy each rtp buffer field.
*/
static GstBuffer *
gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
{
GstMemory *mem = NULL;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
GstBuffer *new_buffer = gst_buffer_new ();
GstMapInfo map;
guint payload_len = 0;
SSRCRtxData *data;
guint32 ssrc;
guint16 seqnum;
guint8 fmtp;
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
/* get needed data from GstRtpRtxSend */
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
ssrc = data->rtx_ssrc;
seqnum = data->next_seqnum++;
fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
GST_DEBUG_OBJECT (rtx,
"retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
seqnum, ssrc);
/* gst_rtp_buffer_map does not map the payload so do it now */
gst_rtp_buffer_get_payload (&rtp);
/* If payload type is not set through SDP/property then
* just bump the value */
if (fmtp < 96)
fmtp = gst_rtp_buffer_get_payload_type (&rtp) + 1;
/* copy fixed header */
mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
gst_buffer_append_memory (new_buffer, mem);
/* copy extension if any */
if (rtp.size[1]) {
mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
gst_buffer_append_memory (new_buffer, mem);
}
/* copy payload and add OSN just before */
payload_len = 2 + rtp.size[2];
mem = gst_allocator_alloc (NULL, payload_len, NULL);
gst_memory_map (mem, &map, GST_MAP_WRITE);
GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
if (rtp.size[2])
memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
gst_memory_unmap (mem, &map);
gst_buffer_append_memory (new_buffer, mem);
/* everything needed is copied */
gst_rtp_buffer_unmap (&rtp);
/* set ssrc, seqnum and fmtp */
gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
gst_rtp_buffer_set_seq (&new_rtp, seqnum);
gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
/* RFC 4588: let other elements do the padding, as normal */
gst_rtp_buffer_set_padding (&new_rtp, FALSE);
gst_rtp_buffer_unmap (&new_rtp);
return new_buffer;
}
static gint
buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
gpointer user_data)
@ -360,7 +435,8 @@ gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
if (iter) {
BufferQueueItem *item = g_sequence_get (iter);
GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
g_queue_push_tail (rtx->pending, gst_buffer_ref (item->buffer));
g_queue_push_tail (rtx->pending,
gst_rtp_rtx_buffer_new (rtx, item->buffer));
}
}
GST_OBJECT_UNLOCK (rtx);
@ -495,87 +571,12 @@ gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
}
/* Copy fixed header and extension. Add OSN before to copy payload
* Copy memory to avoid to manually copy each rtp buffer field.
*/
static GstBuffer *
_gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
{
GstMemory *mem = NULL;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
GstBuffer *new_buffer = gst_buffer_new ();
GstMapInfo map;
guint payload_len = 0;
SSRCRtxData *data;
guint32 ssrc;
guint16 seqnum;
guint8 fmtp;
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
/* get needed data from GstRtpRtxSend */
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
ssrc = data->rtx_ssrc;
seqnum = data->next_seqnum++;
fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
GST_DEBUG_OBJECT (rtx,
"retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
seqnum, ssrc);
/* gst_rtp_buffer_map does not map the payload so do it now */
gst_rtp_buffer_get_payload (&rtp);
/* If payload type is not set through SDP/property then
* just bump the value */
if (fmtp < 96)
fmtp = gst_rtp_buffer_get_payload_type (&rtp) + 1;
/* copy fixed header */
mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
gst_buffer_append_memory (new_buffer, mem);
/* copy extension if any */
if (rtp.size[1]) {
mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
gst_buffer_append_memory (new_buffer, mem);
}
/* copy payload and add OSN just before */
payload_len = 2 + rtp.size[2];
mem = gst_allocator_alloc (NULL, payload_len, NULL);
gst_memory_map (mem, &map, GST_MAP_WRITE);
GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
if (rtp.size[2])
memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
gst_memory_unmap (mem, &map);
gst_buffer_append_memory (new_buffer, mem);
/* everything needed is copied */
gst_rtp_buffer_unmap (&rtp);
/* set ssrc, seqnum and fmtp */
gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
gst_rtp_buffer_set_seq (&new_rtp, seqnum);
gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
/* RFC 4588: let other elements do the padding, as normal */
gst_rtp_buffer_set_padding (&new_rtp, FALSE);
gst_rtp_buffer_unmap (&new_rtp);
return new_buffer;
}
/* push pending retransmission packet.
* it constructs rtx packet from original packets */
static void
do_push (GstBuffer * buffer, GstRtpRtxSend * rtx)
{
gst_pad_push (rtx->srcpad, _gst_rtp_rtx_buffer_new (rtx, buffer));
gst_pad_push (rtx->srcpad, buffer);
}
static GstFlowReturn
@ -638,7 +639,7 @@ gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
/* retransmit requested packets */
if (pending) {
g_queue_foreach (pending, (GFunc) do_push, rtx);
g_queue_free_full (pending, (GDestroyNotify) gst_buffer_unref);
g_queue_free (pending);
}
GST_LOG_OBJECT (rtx,