From 6502d08e437cc4287fdaabfadc828afeac761f32 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 1 Nov 2012 15:54:58 +0000 Subject: [PATCH] gstpay: rewrite payloader Use adapter to assemble the payload and make a flush function to turn this payload into (fragmented) packets. Conflicts: gst/rtp/gstrtpgstpay.c gst/rtp/gstrtpgstpay.h --- gst/rtp/gstrtpgstpay.c | 268 +++++++++++++++++++---------------------- gst/rtp/gstrtpgstpay.h | 6 +- 2 files changed, 130 insertions(+), 144 deletions(-) diff --git a/gst/rtp/gstrtpgstpay.c b/gst/rtp/gstrtpgstpay.c index ffaae71a6c..0a1d8a4cd4 100644 --- a/gst/rtp/gstrtpgstpay.c +++ b/gst/rtp/gstrtpgstpay.c @@ -111,6 +111,7 @@ gst_rtp_gst_pay_class_init (GstRtpGSTPayClass * klass) static void gst_rtp_gst_pay_init (GstRtpGSTPay * rtpgstpay) { + rtpgstpay->adapter = gst_adapter_new (); } static void @@ -120,118 +121,32 @@ gst_rtp_gst_pay_finalize (GObject * obj) rtpgstpay = GST_RTP_GST_PAY (obj); - g_free (rtpgstpay->capsstr); + g_object_unref (rtpgstpay->adapter); G_OBJECT_CLASS (parent_class)->finalize (obj); } -static gboolean -gst_rtp_gst_pay_setcaps (GstRTPBasePayload * payload, GstCaps * caps) -{ - GstRtpGSTPay *rtpgstpay; - gboolean res; - gchar *capsenc, *capsver; - - rtpgstpay = GST_RTP_GST_PAY (payload); - - g_free (rtpgstpay->capsstr); - rtpgstpay->capsstr = gst_caps_to_string (caps); - rtpgstpay->capslen = strlen (rtpgstpay->capsstr); - rtpgstpay->current_CV = rtpgstpay->next_CV; - - /* encode without 0 byte */ - capsenc = g_base64_encode ((guchar *) rtpgstpay->capsstr, rtpgstpay->capslen); - GST_DEBUG_OBJECT (payload, "caps=%s, caps(base64)=%s", - rtpgstpay->capsstr, capsenc); - /* for 0 byte */ - rtpgstpay->capslen++; - - capsver = g_strdup_printf ("%d", rtpgstpay->current_CV); - - gst_rtp_base_payload_set_options (payload, "application", TRUE, "X-GST", - 90000); - res = - gst_rtp_base_payload_set_outcaps (payload, "caps", G_TYPE_STRING, capsenc, - "capsversion", G_TYPE_STRING, capsver, NULL); - g_free (capsenc); - g_free (capsver); - - return res; -} - static GstFlowReturn -gst_rtp_gst_pay_handle_buffer (GstRTPBasePayload * basepayload, - GstBuffer * buffer) +gst_rtp_gst_pay_flush (GstRtpGSTPay * rtpgstpay, GstClockTime timestamp) { - GstRtpGSTPay *rtpgstpay; - GstMapInfo map; - guint8 *ptr; - gsize left; - GstBuffer *outbuf; GstFlowReturn ret; - GstClockTime timestamp; - guint32 frag_offset; - guint flags; - gchar *capsstr; - guint capslen; - guint capslen_prefix_len; + guint avail; + guint frag_offset; - rtpgstpay = GST_RTP_GST_PAY (basepayload); - - gst_buffer_map (buffer, &map, GST_MAP_READ); - timestamp = GST_BUFFER_TIMESTAMP (buffer); - - ret = GST_FLOW_OK; - - /* caps always from SDP for now */ - flags = 0; - if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) - flags |= (1 << 3); - - capsstr = rtpgstpay->capsstr; - capslen = rtpgstpay->capslen; - if (capslen) { - /* start of buffer, calculate length */ - capslen_prefix_len = 1; - while (capslen >> (7 * capslen_prefix_len)) - capslen_prefix_len++; - - GST_DEBUG_OBJECT (rtpgstpay, "sending inline caps"); - rtpgstpay->next_CV++; - - flags |= (1 << 7); - } else { - capslen_prefix_len = 0; - } - - flags |= (rtpgstpay->current_CV << 4); - - /* - * 0 1 2 3 - * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * |C| CV |D|X|Y|Z| MBZ | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * | Frag_offset | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ frag_offset = 0; - ptr = map.data; - left = map.size; + avail = gst_adapter_available (rtpgstpay->adapter); - GST_DEBUG_OBJECT (basepayload, "buffer size=%u", left); - - while (left > 0) { + while (avail) { guint towrite; guint8 *payload; guint payload_len; guint packet_len; + GstBuffer *outbuf; GstRTPBuffer rtp = { NULL }; + /* this will be the total lenght of the packet */ - packet_len = - gst_rtp_buffer_calc_packet_len (8 + capslen + capslen_prefix_len + left, - 0, 0); + packet_len = gst_rtp_buffer_calc_packet_len (8 + avail, 0, 0); /* fill one MTU or all available bytes */ towrite = MIN (packet_len, GST_RTP_BASE_PAYLOAD_MTU (rtpgstpay)); @@ -245,10 +160,19 @@ gst_rtp_gst_pay_handle_buffer (GstRTPBasePayload * basepayload, gst_rtp_buffer_map (outbuf, GST_MAP_WRITE, &rtp); payload = gst_rtp_buffer_get_payload (&rtp); - GST_DEBUG_OBJECT (basepayload, "new packet len %u, frag %u", packet_len, + GST_DEBUG_OBJECT (rtpgstpay, "new packet len %u, frag %u", packet_len, frag_offset); - payload[0] = flags; + /* + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |C| CV |D|0|0|0| MBZ | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Frag_offset | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + payload[0] = rtpgstpay->flags; payload[1] = payload[2] = payload[3] = 0; payload[4] = frag_offset >> 24; payload[5] = frag_offset >> 16; @@ -258,61 +182,121 @@ gst_rtp_gst_pay_handle_buffer (GstRTPBasePayload * basepayload, payload += 8; payload_len -= 8; - if (capslen) { - guint tocopy; + GST_DEBUG_OBJECT (rtpgstpay, "copy %u bytes from adapter", payload_len); - /* we need to write caps */ - if (frag_offset == 0) { - /* write caps length */ - while (capslen_prefix_len) { - capslen_prefix_len--; - *payload++ = ((capslen_prefix_len > 0) ? 0x80 : 0) | - ((capslen >> (7 * capslen_prefix_len)) & 0x7f); - payload_len--; - frag_offset++; - } - } + gst_adapter_copy (rtpgstpay->adapter, payload, 0, payload_len); + gst_adapter_flush (rtpgstpay->adapter, payload_len); - tocopy = MIN (payload_len, capslen); - GST_DEBUG_OBJECT (basepayload, "copy %u bytes from caps to payload", - tocopy); - memcpy (payload, capsstr, tocopy); + frag_offset += payload_len; + avail -= payload_len; - capsstr += tocopy; - capslen -= tocopy; - payload += tocopy; - payload_len -= tocopy; - frag_offset += tocopy; - - if (capslen == 0) { - rtpgstpay->capslen = 0; - g_free (rtpgstpay->capsstr); - rtpgstpay->capsstr = NULL; - } - } - - if (capslen == 0) { - /* no more caps, continue with data */ - GST_DEBUG_OBJECT (basepayload, "copy %u bytes from buffer to payload", - payload_len); - memcpy (payload, ptr, payload_len); - - ptr += payload_len; - left -= payload_len; - frag_offset += payload_len; - } - - if (left == 0) + if (avail == 0) gst_rtp_buffer_set_marker (&rtp, TRUE); gst_rtp_buffer_unmap (&rtp); GST_BUFFER_TIMESTAMP (outbuf) = timestamp; - ret = gst_rtp_base_payload_push (basepayload, outbuf); + ret = gst_rtp_base_payload_push (GST_RTP_BASE_PAYLOAD (rtpgstpay), outbuf); + if (ret != GST_FLOW_OK) + goto push_failed; } - gst_buffer_unmap (buffer, &map); - gst_buffer_unref (buffer); + rtpgstpay->flags &= 0x70; + + return GST_FLOW_OK; + + /* ERRORS */ +push_failed: + { + GST_DEBUG_OBJECT (rtpgstpay, "push failed %d (%s)", ret, + gst_flow_get_name (ret)); + gst_adapter_clear (rtpgstpay->adapter); + rtpgstpay->flags &= 0x70; + return ret; + } +} + +static gboolean +gst_rtp_gst_pay_setcaps (GstRTPBasePayload * payload, GstCaps * caps) +{ + GstRtpGSTPay *rtpgstpay; + gboolean res; + gchar *capsstr, *capsenc, *capsver; + guint capslen, capslen_prefix_len; + guint8 *ptr; + GstBuffer *outbuf; + GstMapInfo map; + + rtpgstpay = GST_RTP_GST_PAY (payload); + + capsstr = gst_caps_to_string (caps); + capslen = strlen (capsstr); + + rtpgstpay->current_CV = rtpgstpay->next_CV; + + /* encode without 0 byte */ + capsenc = g_base64_encode ((guchar *) capsstr, capslen); + GST_DEBUG_OBJECT (payload, "caps=%s, caps(base64)=%s", capsstr, capsenc); + /* for 0 byte */ + capslen++; + + /* start of buffer, calculate length */ + capslen_prefix_len = 1; + while (capslen >> (7 * capslen_prefix_len)) + capslen_prefix_len++; + + outbuf = gst_buffer_new_and_alloc (capslen + capslen_prefix_len); + + gst_buffer_map (outbuf, &map, GST_MAP_WRITE); + ptr = map.data; + + /* write caps length */ + while (capslen_prefix_len) { + capslen_prefix_len--; + *ptr++ = ((capslen_prefix_len > 0) ? 0x80 : 0) | + ((capslen >> (7 * capslen_prefix_len)) & 0x7f); + } + memcpy (ptr, capsstr, capslen); + gst_buffer_unmap (outbuf, &map); + g_free (capsstr); + + /* store in adapter, we don't flush yet, buffer will follow */ + rtpgstpay->flags = (1 << 7) | (rtpgstpay->current_CV << 4); + rtpgstpay->next_CV = (rtpgstpay->next_CV + 1) & 0x7; + gst_adapter_push (rtpgstpay->adapter, outbuf); + + gst_rtp_base_payload_set_options (payload, "application", TRUE, "X-GST", + 90000); + + /* make caps for SDP */ + capsver = g_strdup_printf ("%d", rtpgstpay->current_CV); + res = + gst_rtp_base_payload_set_outcaps (payload, "caps", G_TYPE_STRING, capsenc, + "capsversion", G_TYPE_STRING, capsver, NULL); + g_free (capsenc); + g_free (capsver); + + return res; +} + +static GstFlowReturn +gst_rtp_gst_pay_handle_buffer (GstRTPBasePayload * basepayload, + GstBuffer * buffer) +{ + GstFlowReturn ret; + GstRtpGSTPay *rtpgstpay; + GstClockTime timestamp; + + rtpgstpay = GST_RTP_GST_PAY (basepayload); + + timestamp = GST_BUFFER_TIMESTAMP (buffer); + + /* caps always from SDP for now */ + if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) + rtpgstpay->flags |= (1 << 3); + + gst_adapter_push (rtpgstpay->adapter, buffer); + ret = gst_rtp_gst_pay_flush (rtpgstpay, timestamp); return ret; } diff --git a/gst/rtp/gstrtpgstpay.h b/gst/rtp/gstrtpgstpay.h index bde2982ccb..253fb412ce 100644 --- a/gst/rtp/gstrtpgstpay.h +++ b/gst/rtp/gstrtpgstpay.h @@ -22,6 +22,7 @@ #include #include +#include G_BEGIN_DECLS @@ -43,8 +44,9 @@ struct _GstRtpGSTPay { GstRTPBasePayload payload; - gchar *capsstr; - guint capslen; + GstAdapter *adapter; + guint8 flags; + guint8 current_CV; /* CV field of incoming caps*/ guint8 next_CV; };