diff --git a/gst/rtp/gstrtpj2kdepay.c b/gst/rtp/gstrtpj2kdepay.c index 83ccd9c4a7..82411aa987 100644 --- a/gst/rtp/gstrtpj2kdepay.c +++ b/gst/rtp/gstrtpj2kdepay.c @@ -46,6 +46,16 @@ GST_STATIC_PAD_TEMPLATE ("sink", "clock-rate = (int) 90000, " "encoding-name = (string) \"JPEG2000\"") ); +typedef enum +{ + J2K_MARKER = 0xFF, + J2K_MARKER_SOC = 0x4F, + J2K_MARKER_SOT = 0x90, + J2K_MARKER_SOP = 0x91, + J2K_MARKER_SOD = 0x93, + J2K_MARKER_EOC = 0xD9 +} RtpJ2KMarker; + GST_BOILERPLATE (GstRtpJ2KDepay, gst_rtp_j2k_depay, GstBaseRTPDepayload, GST_TYPE_BASE_RTP_DEPAYLOAD); @@ -102,7 +112,40 @@ static void gst_rtp_j2k_depay_init (GstRtpJ2KDepay * rtpj2kdepay, GstRtpJ2KDepayClass * klass) { - rtpj2kdepay->adapter = gst_adapter_new (); + rtpj2kdepay->pu_adapter = gst_adapter_new (); + rtpj2kdepay->t_adapter = gst_adapter_new (); + rtpj2kdepay->f_adapter = gst_adapter_new (); +} + +static void +store_mheader (GstRtpJ2KDepay * rtpj2kdepay, guint idx, GstBuffer * buf) +{ + GstBuffer *old; + + GST_DEBUG_OBJECT (rtpj2kdepay, "storing main header %p at index %u", buf, + idx); + if ((old = rtpj2kdepay->MH[idx])) + gst_buffer_unref (old); + rtpj2kdepay->MH[idx] = buf; +} + +static void +clear_mheaders (GstRtpJ2KDepay * rtpj2kdepay) +{ + guint i; + + for (i = 0; i < 8; i++) + store_mheader (rtpj2kdepay, i, NULL); +} + +static void +gst_rtp_j2k_depay_reset (GstRtpJ2KDepay * rtpj2kdepay) +{ + clear_mheaders (rtpj2kdepay); + gst_adapter_clear (rtpj2kdepay->pu_adapter); + gst_adapter_clear (rtpj2kdepay->t_adapter); + gst_adapter_clear (rtpj2kdepay->f_adapter); + rtpj2kdepay->next_frag = 0; } static void @@ -112,8 +155,11 @@ gst_rtp_j2k_depay_finalize (GObject * object) rtpj2kdepay = GST_RTP_J2K_DEPAY (object); - g_object_unref (rtpj2kdepay->adapter); - rtpj2kdepay->adapter = NULL; + clear_mheaders (rtpj2kdepay); + + g_object_unref (rtpj2kdepay->pu_adapter); + g_object_unref (rtpj2kdepay->t_adapter); + g_object_unref (rtpj2kdepay->f_adapter); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -142,65 +188,185 @@ gst_rtp_j2k_depay_setcaps (GstBaseRTPDepayload * depayload, GstCaps * caps) return res; } -static GstBuffer * -gst_rtp_j2k_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf) +static void +gst_rtp_j2k_depay_clear_pu (GstRtpJ2KDepay * rtpj2kdepay) +{ + gst_adapter_clear (rtpj2kdepay->pu_adapter); + rtpj2kdepay->have_sync = FALSE; +} + +static GstFlowReturn +gst_rtp_j2k_depay_flush_pu (GstBaseRTPDepayload * depayload) { GstRtpJ2KDepay *rtpj2kdepay; - GstBuffer *outbuf; - guint8 *payload; - guint frag_offset; + GstBuffer *mheader; + guint avail, MHF, mh_id; rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload); - /* flush everything on discont for now */ - if (GST_BUFFER_IS_DISCONT (buf)) { - GST_DEBUG_OBJECT (rtpj2kdepay, "DISCONT, flushing data"); - gst_adapter_clear (rtpj2kdepay->adapter); - rtpj2kdepay->need_header = TRUE; + /* take all available buffers */ + avail = gst_adapter_available (rtpj2kdepay->pu_adapter); + if (avail == 0) + goto done; + + MHF = rtpj2kdepay->pu_MHF; + mh_id = rtpj2kdepay->last_mh_id; + + GST_DEBUG_OBJECT (rtpj2kdepay, "flushing PU of size %u", avail); + + if (MHF == 0) { + GList *packets, *walk; + + packets = gst_adapter_take_list (rtpj2kdepay->pu_adapter, avail); + /* append packets */ + for (walk = packets; walk; walk = g_list_next (walk)) { + GstBuffer *buf = GST_BUFFER_CAST (walk->data); + GST_DEBUG_OBJECT (rtpj2kdepay, "append pu packet of size %u", + GST_BUFFER_SIZE (buf)); + gst_adapter_push (rtpj2kdepay->t_adapter, buf); + } + g_list_free (packets); + } else { + /* we have a header */ + GST_DEBUG_OBJECT (rtpj2kdepay, "keeping header %u", mh_id); + /* we managed to see the start and end of the header, take all from + * adapter and keep in header */ + mheader = gst_adapter_take_buffer (rtpj2kdepay->pu_adapter, avail); + + store_mheader (rtpj2kdepay, mh_id, mheader); } - if (gst_rtp_buffer_get_payload_len (buf) < 8) - goto empty_packet; +done: + rtpj2kdepay->have_sync = FALSE; - payload = gst_rtp_buffer_get_payload (buf); + return GST_FLOW_OK; +} - /* - * 0 1 2 3 - * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * |tp |MHF|mh_id|T| priority | tile number | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * |reserved | fragment offset | - * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - */ - frag_offset = (payload[5] << 16) | (payload[6] << 8) | payload[7]; +static GstFlowReturn +gst_rtp_j2k_depay_flush_tile (GstBaseRTPDepayload * depayload) +{ + GstRtpJ2KDepay *rtpj2kdepay; + guint avail, mh_id; + GList *packets, *walk; + guint8 end[2]; + GstFlowReturn ret = GST_FLOW_OK; - GST_DEBUG_OBJECT (rtpj2kdepay, "frag %u", frag_offset); + rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload); - if (rtpj2kdepay->need_header) { - if (frag_offset != 0) + /* flush pending PU */ + gst_rtp_j2k_depay_flush_pu (depayload); + + /* take all available buffers */ + avail = gst_adapter_available (rtpj2kdepay->t_adapter); + if (avail == 0) + goto done; + + mh_id = rtpj2kdepay->last_mh_id; + + GST_DEBUG_OBJECT (rtpj2kdepay, "flushing tile of size %u", avail); + + if (gst_adapter_available (rtpj2kdepay->f_adapter) == 0) { + GstBuffer *mheader; + + /* we need a header now */ + if ((mheader = rtpj2kdepay->MH[mh_id]) == NULL) goto waiting_header; - rtpj2kdepay->need_header = FALSE; + /* push header in the adapter */ + GST_DEBUG_OBJECT (rtpj2kdepay, "pushing header %u", mh_id); + gst_adapter_push (rtpj2kdepay->f_adapter, gst_buffer_ref (mheader)); } - /* take JPEG 2000 data, push in the adapter */ - outbuf = gst_rtp_buffer_get_payload_subbuffer (buf, 8, -1); - gst_adapter_push (rtpj2kdepay->adapter, outbuf); - outbuf = NULL; + /* check for last bytes */ + gst_adapter_copy (rtpj2kdepay->t_adapter, end, avail - 2, 2); - if (gst_rtp_buffer_get_marker (buf)) { - guint avail; - guint8 end[2]; - guint8 *data; + /* now append the tile packets to the frame */ + packets = gst_adapter_take_list (rtpj2kdepay->t_adapter, avail); + for (walk = packets; walk; walk = g_list_next (walk)) { + GstBuffer *buf = GST_BUFFER_CAST (walk->data); - /* last buffer take all data out of the adapter */ - avail = gst_adapter_available (rtpj2kdepay->adapter); - GST_DEBUG_OBJECT (rtpj2kdepay, "marker set, last buffer"); + if (walk == packets) { + guint8 *data; + guint size; + /* first buffer should contain the SOT */ + data = GST_BUFFER_DATA (buf); + size = GST_BUFFER_SIZE (buf); + + if (size < 12) + goto invalid_tile; + + if (data[0] == 0xff && data[1] == J2K_MARKER_SOT) { + guint Psot, nPsot; + + if (end[0] == 0xff && end[1] == J2K_MARKER_EOC) + nPsot = avail - 2; + else + nPsot = avail; + + Psot = GST_READ_UINT32_BE (&data[6]); + if (Psot != nPsot && Psot != 0) { + /* Psot must match the size of the tile */ + GST_DEBUG_OBJECT (rtpj2kdepay, "set Psot from %u to %u", Psot, nPsot); + buf = gst_buffer_make_writable (buf); + data = GST_BUFFER_DATA (buf); + GST_WRITE_UINT32_BE (&data[6], nPsot); + } + } + } + + GST_DEBUG_OBJECT (rtpj2kdepay, "append pu packet of size %u", + GST_BUFFER_SIZE (buf)); + gst_adapter_push (rtpj2kdepay->f_adapter, buf); + } + g_list_free (packets); + +done: + rtpj2kdepay->last_tile = -1; + + return ret; + + /* errors */ +waiting_header: + { + GST_DEBUG_OBJECT (rtpj2kdepay, "waiting for header %u", mh_id); + gst_adapter_clear (rtpj2kdepay->t_adapter); + rtpj2kdepay->last_tile = -1; + return ret; + } +invalid_tile: + { + GST_ELEMENT_WARNING (rtpj2kdepay, STREAM, DECODE, ("Invalid tile"), (NULL)); + gst_adapter_clear (rtpj2kdepay->t_adapter); + rtpj2kdepay->last_tile = -1; + return ret; + } +} + +static GstFlowReturn +gst_rtp_j2k_depay_flush_frame (GstBaseRTPDepayload * depayload) +{ + GstRtpJ2KDepay *rtpj2kdepay; + guint8 end[2]; + guint8 *data; + guint avail; + GstBuffer *outbuf; + GstFlowReturn ret = GST_FLOW_OK; + + rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload); + + /* flush pending tile */ + gst_rtp_j2k_depay_flush_tile (depayload); + + /* last buffer take all data out of the adapter */ + avail = gst_adapter_available (rtpj2kdepay->f_adapter); + if (avail == 0) + goto done; + + if (avail > 2) { /* take the last bytes of the JPEG 2000 data to see if there is an EOC * marker */ - gst_adapter_copy (rtpj2kdepay->adapter, end, avail - 2, 2); + gst_adapter_copy (rtpj2kdepay->f_adapter, end, avail - 2, 2); if (end[0] != 0xff && end[1] != 0xd9) { GST_DEBUG_OBJECT (rtpj2kdepay, "no EOC marker, adding one"); @@ -211,15 +377,166 @@ gst_rtp_j2k_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf) data[0] = 0xff; data[1] = 0xd9; - gst_adapter_push (rtpj2kdepay->adapter, outbuf); + gst_adapter_push (rtpj2kdepay->f_adapter, outbuf); avail += 2; } - outbuf = gst_adapter_take_buffer (rtpj2kdepay->adapter, avail); - GST_DEBUG_OBJECT (rtpj2kdepay, "returning %u bytes", avail); + GST_DEBUG_OBJECT (rtpj2kdepay, "pushing %u bytes", avail); + outbuf = gst_adapter_take_buffer (rtpj2kdepay->f_adapter, avail); + ret = gst_base_rtp_depayload_push (depayload, outbuf); + } else { + GST_WARNING_OBJECT (rtpj2kdepay, "empty packet"); + gst_adapter_clear (rtpj2kdepay->f_adapter); } - return outbuf; + /* we accept any mh_id now */ + rtpj2kdepay->last_mh_id = -1; + + /* reset state */ + rtpj2kdepay->next_frag = 0; + rtpj2kdepay->have_sync = FALSE; + +done: + /* we can't keep headers with mh_id of 0 */ + store_mheader (rtpj2kdepay, 0, NULL); + + return ret; +} + +static GstBuffer * +gst_rtp_j2k_depay_process (GstBaseRTPDepayload * depayload, GstBuffer * buf) +{ + GstRtpJ2KDepay *rtpj2kdepay; + guint8 *payload; + guint MHF, mh_id, frag_offset, tile, payload_len, j2klen; + gint gap; + guint32 rtptime; + + rtpj2kdepay = GST_RTP_J2K_DEPAY (depayload); + + payload = gst_rtp_buffer_get_payload (buf); + payload_len = gst_rtp_buffer_get_payload_len (buf); + + /* we need at least a header */ + if (payload_len < 8) + goto empty_packet; + + rtptime = gst_rtp_buffer_get_timestamp (buf); + + /* new timestamp marks new frame */ + if (rtpj2kdepay->last_rtptime != rtptime) { + rtpj2kdepay->last_rtptime = rtptime; + /* flush pending frame */ + gst_rtp_j2k_depay_flush_frame (depayload); + } + + /* + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |tp |MHF|mh_id|T| priority | tile number | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * |reserved | fragment offset | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + MHF = (payload[0] & 0x30) >> 4; + mh_id = (payload[0] & 0xe) >> 1; + + if (rtpj2kdepay->last_mh_id == -1) + rtpj2kdepay->last_mh_id = mh_id; + else if (rtpj2kdepay->last_mh_id != mh_id) + goto wrong_mh_id; + + tile = (payload[2] << 8) | payload[3]; + frag_offset = (payload[5] << 16) | (payload[6] << 8) | payload[7]; + j2klen = payload_len - 8; + + GST_DEBUG_OBJECT (rtpj2kdepay, "MHF %u, tile %u, frag %u, expected %u", MHF, + tile, frag_offset, rtpj2kdepay->next_frag); + + /* calculate the gap between expected frag */ + gap = frag_offset - rtpj2kdepay->next_frag; + /* calculate next frag */ + rtpj2kdepay->next_frag = frag_offset + j2klen; + + if (gap != 0) { + GST_DEBUG_OBJECT (rtpj2kdepay, "discont of %d, clear PU", gap); + /* discont, clear pu adapter and resync */ + gst_rtp_j2k_depay_clear_pu (rtpj2kdepay); + } + + /* check for sync code */ + if (j2klen > 2 && payload[8] == 0xff) { + guint marker = payload[9]; + + /* packets must start with SOC, SOT or SOP */ + switch (marker) { + case J2K_MARKER_SOC: + GST_DEBUG_OBJECT (rtpj2kdepay, "found SOC packet"); + /* flush the previous frame, should have happened when the timestamp + * changed above. */ + gst_rtp_j2k_depay_flush_frame (depayload); + rtpj2kdepay->have_sync = TRUE; + break; + case J2K_MARKER_SOT: + /* flush the previous tile */ + gst_rtp_j2k_depay_flush_tile (depayload); + GST_DEBUG_OBJECT (rtpj2kdepay, "found SOT packet"); + rtpj2kdepay->have_sync = TRUE; + /* we sync on the tile now */ + rtpj2kdepay->last_tile = tile; + break; + case J2K_MARKER_SOP: + GST_DEBUG_OBJECT (rtpj2kdepay, "found SOP packet"); + /* flush the previous PU */ + gst_rtp_j2k_depay_flush_pu (depayload); + if (rtpj2kdepay->last_tile != tile) { + /* wrong tile, we lose sync and we need a new SOT or SOC to regain + * sync. First flush out the previous tile if we have one. */ + if (rtpj2kdepay->last_tile != -1) + gst_rtp_j2k_depay_flush_tile (depayload); + /* now we have no more valid tile and no sync */ + rtpj2kdepay->last_tile = -1; + rtpj2kdepay->have_sync = FALSE; + } else { + rtpj2kdepay->have_sync = TRUE; + } + break; + default: + GST_DEBUG_OBJECT (rtpj2kdepay, "no sync packet 0x%02d", marker); + break; + } + } + + if (rtpj2kdepay->have_sync) { + GstBuffer *pu_frag; + + if (gst_adapter_available (rtpj2kdepay->pu_adapter) == 0) { + /* first part of pu, record state */ + GST_DEBUG_OBJECT (rtpj2kdepay, "first PU"); + rtpj2kdepay->pu_MHF = MHF; + } + /* and push in pu adapter */ + GST_DEBUG_OBJECT (rtpj2kdepay, "push pu of size %u in adapter", j2klen); + pu_frag = gst_rtp_buffer_get_payload_subbuffer (buf, 8, -1); + gst_adapter_push (rtpj2kdepay->pu_adapter, pu_frag); + + if (MHF & 2) { + /* last part of main header received, we can flush it */ + GST_DEBUG_OBJECT (rtpj2kdepay, "header end, flush pu"); + gst_rtp_j2k_depay_flush_pu (depayload); + } + } else { + GST_DEBUG_OBJECT (rtpj2kdepay, "discard packet, no sync"); + } + + /* marker bit finishes the frame */ + if (gst_rtp_buffer_get_marker (buf)) { + GST_DEBUG_OBJECT (rtpj2kdepay, "marker set, last buffer"); + /* then flush frame */ + gst_rtp_j2k_depay_flush_frame (depayload); + } + return NULL; /* ERRORS */ empty_packet: @@ -228,9 +545,12 @@ empty_packet: ("Empty Payload."), (NULL)); return NULL; } -waiting_header: +wrong_mh_id: { - GST_DEBUG_OBJECT (rtpj2kdepay, "we are waiting for a header"); + GST_ELEMENT_WARNING (rtpj2kdepay, STREAM, DECODE, + ("Invalid mh_id %u, expected %u", mh_id, rtpj2kdepay->last_mh_id), + (NULL)); + gst_rtp_j2k_depay_clear_pu (rtpj2kdepay); return NULL; } } @@ -247,8 +567,7 @@ gst_rtp_j2k_depay_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - gst_adapter_clear (rtpj2kdepay->adapter); - rtpj2kdepay->need_header = TRUE; + gst_rtp_j2k_depay_reset (rtpj2kdepay); break; default: break; @@ -258,7 +577,7 @@ gst_rtp_j2k_depay_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_adapter_clear (rtpj2kdepay->adapter); + gst_rtp_j2k_depay_reset (rtpj2kdepay); break; case GST_STATE_CHANGE_READY_TO_NULL: break; diff --git a/gst/rtp/gstrtpj2kdepay.h b/gst/rtp/gstrtpj2kdepay.h index 41120af42e..e855eef940 100644 --- a/gst/rtp/gstrtpj2kdepay.h +++ b/gst/rtp/gstrtpj2kdepay.h @@ -44,8 +44,19 @@ struct _GstRtpJ2KDepay { GstBaseRTPDepayload depayload; - GstAdapter *adapter; - gboolean need_header; + guint64 last_rtptime; + guint last_mh_id; + guint last_tile; + + GstBuffer *MH[8]; + + guint pu_MHF; + GstAdapter *pu_adapter; + GstAdapter *t_adapter; + GstAdapter *f_adapter; + + guint next_frag; + gboolean have_sync; gint width, height; };