/* GStreamer RTMP Library
 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
 *   Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
 * Boston, MA 02110-1335, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "rtmpchunkstream.h"
#include "rtmputils.h"

GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_stream_debug_category);
#define GST_CAT_DEFAULT gst_rtmp_chunk_stream_debug_category

static void
init_debug (void)
{
  static gsize done = 0;
  if (g_once_init_enter (&done)) {
    GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_stream_debug_category,
        "rtmpchunkstream", 0, "debug category for rtmp chunk streams");
    g_once_init_leave (&done, 1);
  }
}

enum
{
  CHUNK_BYTE_TWOBYTE = 0,
  CHUNK_BYTE_THREEBYTE = 1,
  CHUNK_BYTE_MASK = 0x3f,
  CHUNK_STREAM_MIN_TWOBYTE = 0x40,
  CHUNK_STREAM_MIN_THREEBYTE = 0x140,
  CHUNK_STREAM_MAX_THREEBYTE = 0x1003f,
};

typedef enum
{
  CHUNK_TYPE_0 = 0,
  CHUNK_TYPE_1 = 1,
  CHUNK_TYPE_2 = 2,
  CHUNK_TYPE_3 = 3,
} ChunkType;

static const gsize chunk_header_sizes[4] = { 11, 7, 3, 0 };

struct _GstRtmpChunkStream
{
  GstBuffer *buffer;
  GstRtmpMeta *meta;
  GstMapInfo map;               /* Only used for parsing */
  guint32 id;
  guint32 offset;
  guint64 bytes;
};

struct _GstRtmpChunkStreams
{
  GArray *array;
};

static inline gboolean
chunk_stream_is_open (GstRtmpChunkStream * cstream)
{
  return cstream->map.data != NULL;
}

static void
chunk_stream_take_buffer (GstRtmpChunkStream * cstream, GstBuffer * buffer)
{
  GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
  g_assert (meta);
  g_assert (cstream->buffer == NULL);
  cstream->buffer = buffer;
  cstream->meta = meta;
}

static void
chunk_stream_clear (GstRtmpChunkStream * cstream)
{
  if (chunk_stream_is_open (cstream)) {
    gst_buffer_unmap (cstream->buffer, &cstream->map);
    cstream->map.data = NULL;
  }

  gst_buffer_replace (&cstream->buffer, NULL);
  cstream->meta = NULL;
  cstream->offset = 0;
}

static guint32
chunk_stream_next_size (GstRtmpChunkStream * cstream, guint32 chunk_size)
{
  guint32 size, offset;

  size = cstream->meta->size;
  offset = cstream->offset;

  g_return_val_if_fail (chunk_size, 0);
  g_return_val_if_fail (offset <= size, 0);
  return MIN (size - offset, chunk_size);
}

static inline gboolean
needs_ext_ts (GstRtmpMeta * meta)
{
  return meta->ts_delta >= 0xffffff;
}


static guint32
dts_to_abs_ts (GstBuffer * buffer)
{
  GstClockTime dts = GST_BUFFER_DTS (buffer);
  guint32 ret = 0;

  if (GST_CLOCK_TIME_IS_VALID (dts)) {
    ret = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
  }

  GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " into abs TS %"
      G_GUINT32_FORMAT " ms", GST_TIME_ARGS (dts), ret);
  return ret;
}

static gboolean
dts_diff_to_delta_ts (GstBuffer * old_buffer, GstBuffer * buffer,
    guint32 * out_ts)
{
  GstClockTime dts = GST_BUFFER_DTS (buffer),
      old_dts = GST_BUFFER_DTS (old_buffer);
  guint32 abs_ts, old_abs_ts, delta_32 = 0;

  if (!GST_CLOCK_TIME_IS_VALID (dts) || !GST_CLOCK_TIME_IS_VALID (old_dts)) {
    GST_LOG ("Timestamps not valid; using delta TS 0");
    goto out;
  }

  if (ABS (GST_CLOCK_DIFF (old_dts, dts)) > GST_MSECOND * G_MAXINT32) {
    GST_WARNING ("Timestamp delta too large: %" GST_TIME_FORMAT " -> %"
        GST_TIME_FORMAT, GST_TIME_ARGS (old_dts), GST_TIME_ARGS (dts));
    return FALSE;
  }

  abs_ts = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
  old_abs_ts = gst_util_uint64_scale_round (old_dts, 1, GST_MSECOND);

  /* underflow wraps around */
  delta_32 = abs_ts - old_abs_ts;

  GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT
      " ms) -> %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT " ms) into delta TS %"
      G_GUINT32_FORMAT " ms", GST_TIME_ARGS (old_dts), old_abs_ts,
      GST_TIME_ARGS (dts), abs_ts, delta_32);

out:
  *out_ts = delta_32;
  return TRUE;
}

static ChunkType
select_chunk_type (GstRtmpChunkStream * cstream, GstBuffer * buffer)
{
  GstBuffer *old_buffer = cstream->buffer;
  GstRtmpMeta *meta, *old_meta;

  g_return_val_if_fail (buffer, -1);

  meta = gst_buffer_get_rtmp_meta (buffer);

  g_return_val_if_fail (meta, -1);
  g_return_val_if_fail (gst_rtmp_message_type_is_valid (meta->type), -1);

  meta->size = gst_buffer_get_size (buffer);
  meta->cstream = cstream->id;

  g_return_val_if_fail (meta->size <= GST_RTMP_MAXIMUM_MESSAGE_SIZE, -1);

  if (!old_buffer) {
    GST_TRACE ("Picking header 0: no previous header");
    meta->ts_delta = dts_to_abs_ts (buffer);
    return CHUNK_TYPE_0;
  }

  old_meta = gst_buffer_get_rtmp_meta (old_buffer);
  g_return_val_if_fail (old_meta, -1);

  if (old_meta->mstream != meta->mstream) {
    GST_TRACE ("Picking header 0: stream mismatch; "
        "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
        old_meta->mstream, meta->mstream);
    meta->ts_delta = dts_to_abs_ts (buffer);
    return CHUNK_TYPE_0;
  }

  if (!dts_diff_to_delta_ts (old_buffer, buffer, &meta->ts_delta)) {
    GST_TRACE ("Picking header 0: timestamp delta overflow");
    meta->ts_delta = dts_to_abs_ts (buffer);
    return CHUNK_TYPE_0;
  }

  /* now at least type 1 */

  if (old_meta->type != meta->type) {
    GST_TRACE ("Picking header 1: type mismatch; want %d got %d",
        old_meta->type, meta->type);
    return CHUNK_TYPE_1;
  }

  if (old_meta->size != meta->size) {
    GST_TRACE ("Picking header 1: size mismatch; "
        "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
        old_meta->size, meta->size);
    return CHUNK_TYPE_1;
  }

  /* now at least type 2 */

  if (old_meta->ts_delta != meta->ts_delta) {
    GST_TRACE ("Picking header 2: timestamp delta mismatch; "
        "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
        old_meta->ts_delta, meta->ts_delta);
    return CHUNK_TYPE_2;
  }

  /* now at least type 3 */

  GST_TRACE ("Picking header 3");
  return CHUNK_TYPE_3;
}

static GstBuffer *
serialize_next (GstRtmpChunkStream * cstream, guint32 chunk_size,
    ChunkType type)
{
  GstRtmpMeta *meta = cstream->meta;
  guint8 small_stream_id;
  gsize header_size = chunk_header_sizes[type], offset;
  gboolean ext_ts;
  GstBuffer *ret;
  GstMapInfo map;

  GST_TRACE ("Serializing a chunk of type %d, offset %" G_GUINT32_FORMAT,
      type, cstream->offset);

  if (cstream->id < CHUNK_STREAM_MIN_TWOBYTE) {
    small_stream_id = cstream->id;
    header_size += 1;
  } else if (cstream->id < CHUNK_STREAM_MIN_THREEBYTE) {
    small_stream_id = CHUNK_BYTE_TWOBYTE;
    header_size += 2;
  } else {
    small_stream_id = CHUNK_BYTE_THREEBYTE;
    header_size += 3;
  }

  ext_ts = needs_ext_ts (meta);
  if (ext_ts) {
    header_size += 4;
  }

  GST_TRACE ("Allocating buffer, header size %" G_GSIZE_FORMAT, header_size);

  ret = gst_buffer_new_allocate (NULL, header_size, NULL);
  if (!ret) {
    GST_ERROR ("Failed to allocate chunk buffer");
    return NULL;
  }

  if (!gst_buffer_map (ret, &map, GST_MAP_WRITE)) {
    GST_ERROR ("Failed to map %" GST_PTR_FORMAT, ret);
    gst_buffer_unref (ret);
    return NULL;
  }

  /* Chunk Basic Header */
  GST_WRITE_UINT8 (map.data, (type << 6) | small_stream_id);
  offset = 1;

  switch (small_stream_id) {
    case CHUNK_BYTE_TWOBYTE:
      GST_WRITE_UINT8 (map.data + 1, cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
      offset += 1;
      break;

    case CHUNK_BYTE_THREEBYTE:
      GST_WRITE_UINT16_LE (map.data + 1,
          cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
      offset += 2;
      break;
  }

  switch (type) {
    case CHUNK_TYPE_0:
      /* SRSLY:  "Message stream ID is stored in little-endian format." */
      GST_WRITE_UINT32_LE (map.data + offset + 7, meta->mstream);
      /* no break */
    case CHUNK_TYPE_1:
      GST_WRITE_UINT24_BE (map.data + offset + 3, meta->size);
      GST_WRITE_UINT8 (map.data + offset + 6, meta->type);
      /* no break */
    case CHUNK_TYPE_2:
      GST_WRITE_UINT24_BE (map.data + offset,
          ext_ts ? 0xffffff : meta->ts_delta);
      /* no break */
    case CHUNK_TYPE_3:
      offset += chunk_header_sizes[type];

      if (ext_ts) {
        GST_WRITE_UINT32_BE (map.data + offset, meta->ts_delta);
        offset += 4;
      }
  }

  g_assert (offset == header_size);
  GST_MEMDUMP (">>> chunk header", map.data, offset);

  gst_buffer_unmap (ret, &map);

  GST_BUFFER_OFFSET (ret) = GST_BUFFER_OFFSET_IS_VALID (cstream->buffer) ?
      GST_BUFFER_OFFSET (cstream->buffer) + cstream->offset : cstream->bytes;
  GST_BUFFER_OFFSET_END (ret) = GST_BUFFER_OFFSET (ret);

  if (meta->size > 0) {
    guint32 payload_size = chunk_stream_next_size (cstream, chunk_size);

    GST_TRACE ("Appending %" G_GUINT32_FORMAT " bytes of payload",
        payload_size);

    gst_buffer_copy_into (ret, cstream->buffer, GST_BUFFER_COPY_MEMORY,
        cstream->offset, payload_size);

    GST_BUFFER_OFFSET_END (ret) += payload_size;
    cstream->offset += payload_size;
    cstream->bytes += payload_size;
  } else {
    GST_TRACE ("Chunk has no payload");
  }

  gst_rtmp_buffer_dump (ret, ">>> chunk");

  return ret;
}

void
gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream)
{
  g_return_if_fail (cstream);
  GST_LOG ("Clearing chunk stream %" G_GUINT32_FORMAT, cstream->id);
  chunk_stream_clear (cstream);
}

guint32
gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size)
{
  guint32 ret;

  if (size < 1) {
    GST_TRACE ("Not enough bytes to read ID");
    return 0;
  }

  ret = GST_READ_UINT8 (data) & CHUNK_BYTE_MASK;

  switch (ret) {
    case CHUNK_BYTE_TWOBYTE:
      if (size < 2) {
        GST_TRACE ("Not enough bytes to read two-byte ID");
        return 0;
      }

      ret = GST_READ_UINT8 (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
      break;

    case CHUNK_BYTE_THREEBYTE:
      if (size < 3) {
        GST_TRACE ("Not enough bytes to read three-byte ID");
        return 0;
      }

      ret = GST_READ_UINT16_LE (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
      break;
  }

  GST_TRACE ("Parsed chunk stream ID %" G_GUINT32_FORMAT, ret);
  return ret;
}

guint32
gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream,
    const guint8 * data, gsize size)
{
  GstBuffer *buffer;
  GstRtmpMeta *meta;
  const guint8 *message_header;
  guint32 header_size;
  ChunkType type;
  gboolean has_abs_timestamp = FALSE;

  g_return_val_if_fail (cstream, 0);
  g_return_val_if_fail (cstream->id == gst_rtmp_chunk_stream_parse_id (data,
          size), 0);

  type = GST_READ_UINT8 (data) >> 6;
  GST_TRACE ("Parsing chunk stream %" G_GUINT32_FORMAT " header type %d",
      cstream->id, type);

  switch (GST_READ_UINT8 (data) & CHUNK_BYTE_MASK) {
    case CHUNK_BYTE_TWOBYTE:
      header_size = 2;
      break;
    case CHUNK_BYTE_THREEBYTE:
      header_size = 3;
      break;
    default:
      header_size = 1;
      break;
  }

  message_header = data + header_size;
  header_size += chunk_header_sizes[type];

  if (cstream->buffer) {
    buffer = cstream->buffer;
    meta = cstream->meta;
    g_assert (meta->cstream == cstream->id);
  } else {
    buffer = gst_buffer_new ();
    GST_BUFFER_DTS (buffer) = 0;
    GST_BUFFER_OFFSET (buffer) = cstream->bytes;
    GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);

    meta = gst_buffer_add_rtmp_meta (buffer);
    meta->cstream = cstream->id;

    chunk_stream_take_buffer (cstream, buffer);
    GST_DEBUG ("Starting parse with new %" GST_PTR_FORMAT, buffer);
  }

  if (size < header_size) {
    GST_TRACE ("not enough bytes to read header");
    return header_size;
  }

  switch (type) {
    case CHUNK_TYPE_0:
      has_abs_timestamp = TRUE;
      /* SRSLY:  "Message stream ID is stored in little-endian format." */
      meta->mstream = GST_READ_UINT32_LE (message_header + 7);
      /* no break */
    case CHUNK_TYPE_1:
      meta->type = GST_READ_UINT8 (message_header + 6);
      meta->size = GST_READ_UINT24_BE (message_header + 3);
      /* no break */
    case CHUNK_TYPE_2:
      meta->ts_delta = GST_READ_UINT24_BE (message_header);
      /* no break */
    case CHUNK_TYPE_3:
      if (needs_ext_ts (meta)) {
        guint32 timestamp;

        if (size < header_size + 4) {
          GST_TRACE ("not enough bytes to read extended timestamp");
          return header_size + 4;
        }

        GST_TRACE ("Reading extended timestamp");
        timestamp = GST_READ_UINT32_BE (data + header_size);

        if (type == 3 && meta->ts_delta != timestamp) {
          GST_WARNING ("Type 3 extended timestamp does not match expected"
              " timestamp (want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT
              "); assuming it's not present", meta->ts_delta, timestamp);
        } else {
          meta->ts_delta = timestamp;
          header_size += 4;
        }
      }
  }

  GST_MEMDUMP ("<<< chunk header", data, header_size);

  if (!chunk_stream_is_open (cstream)) {
    GstClockTime dts = GST_BUFFER_DTS (buffer);
    guint32 delta_32, abs_32;
    gint64 delta_64;

    if (has_abs_timestamp) {
      abs_32 = meta->ts_delta;
      delta_32 = abs_32 - dts / GST_MSECOND;
    } else {
      delta_32 = meta->ts_delta;
      abs_32 = delta_32 + dts / GST_MSECOND;
    }

    GST_TRACE ("Timestamp delta is %" G_GUINT32_FORMAT " (absolute %"
        G_GUINT32_FORMAT ")", delta_32, abs_32);

    /* emulate signed overflow */
    delta_64 = delta_32;
    if (delta_64 > G_MAXINT32) {
      delta_64 -= G_MAXUINT32;
      delta_64 -= 1;
    }

    delta_64 *= GST_MSECOND;

    if (G_LIKELY (delta_64 >= 0)) {
      /* Normal advancement */
    } else if (G_LIKELY ((guint64) (-delta_64) <= dts)) {
      /* In-bounds regression */
      GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT,
          GST_STIME_ARGS (delta_64));
    } else {
      /* Out-of-bounds regression */
      GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT ", offsetting",
          GST_STIME_ARGS (delta_64));
      delta_64 = delta_32 * GST_MSECOND;
    }

    GST_BUFFER_DTS (buffer) += delta_64;

    GST_TRACE ("Adjusted buffer DTS (%" GST_TIME_FORMAT ") by %"
        GST_STIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (dts),
        GST_STIME_ARGS (delta_64), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
  } else {
    GST_TRACE ("Message payload already started; not touching timestamp");
  }

  return header_size;
}

guint32
gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream,
    guint32 chunk_size, guint8 ** data)
{
  GstMemory *mem;

  g_return_val_if_fail (cstream, 0);
  g_return_val_if_fail (cstream->buffer, 0);

  if (!chunk_stream_is_open (cstream)) {
    guint32 size = cstream->meta->size;

    GST_TRACE ("Allocating buffer, payload size %" G_GUINT32_FORMAT, size);

    mem = gst_allocator_alloc (NULL, size, 0);
    if (!mem) {
      GST_ERROR ("Failed to allocate buffer for payload size %"
          G_GUINT32_FORMAT, size);
      return 0;
    }

    gst_buffer_append_memory (cstream->buffer, mem);
    gst_buffer_map (cstream->buffer, &cstream->map, GST_MAP_WRITE);
  }

  g_return_val_if_fail (cstream->map.size == cstream->meta->size, 0);

  if (data) {
    *data = cstream->map.data + cstream->offset;
  }

  return chunk_stream_next_size (cstream, chunk_size);
}

guint32
gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream,
    guint32 chunk_size)
{
  guint32 size;

  g_return_val_if_fail (cstream, FALSE);
  g_return_val_if_fail (chunk_stream_is_open (cstream), FALSE);

  size = chunk_stream_next_size (cstream, chunk_size);
  cstream->offset += size;
  cstream->bytes += size;

  return chunk_stream_next_size (cstream, chunk_size);
}

GstBuffer *
gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream)
{
  GstBuffer *buffer, *empty;

  g_return_val_if_fail (cstream, NULL);
  g_return_val_if_fail (cstream->buffer, NULL);

  buffer = gst_buffer_ref (cstream->buffer);
  GST_BUFFER_OFFSET_END (buffer) = cstream->bytes;

  gst_rtmp_buffer_dump (buffer, "<<< message");

  chunk_stream_clear (cstream);

  empty = gst_buffer_new ();

  if (!gst_buffer_copy_into (empty, buffer, GST_BUFFER_COPY_META, 0, 0)) {
    GST_ERROR ("copy_into failed");
    return NULL;
  }

  GST_BUFFER_DTS (empty) = GST_BUFFER_DTS (buffer);
  GST_BUFFER_OFFSET (empty) = GST_BUFFER_OFFSET_END (buffer);

  chunk_stream_take_buffer (cstream, empty);

  return buffer;
}

GstBuffer *
gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
    GstBuffer * buffer, guint32 chunk_size)
{
  ChunkType type;

  g_return_val_if_fail (cstream, NULL);
  g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL);

  type = select_chunk_type (cstream, buffer);
  g_return_val_if_fail (type >= 0, NULL);

  GST_TRACE ("Starting serialization of message %" GST_PTR_FORMAT
      " into stream %" G_GUINT32_FORMAT, buffer, cstream->id);

  gst_rtmp_buffer_dump (buffer, ">>> message");

  chunk_stream_clear (cstream);
  chunk_stream_take_buffer (cstream, gst_buffer_ref (buffer));

  return serialize_next (cstream, chunk_size, type);
}

GstBuffer *
gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream,
    guint32 chunk_size)
{
  g_return_val_if_fail (cstream, NULL);
  g_return_val_if_fail (cstream->buffer, NULL);

  if (chunk_stream_next_size (cstream, chunk_size) == 0) {
    GST_TRACE ("Message serialization finished");
    return NULL;
  }

  GST_TRACE ("Continuing serialization of message %" GST_PTR_FORMAT
      " into stream %" G_GUINT32_FORMAT, cstream->buffer, cstream->id);

  return serialize_next (cstream, chunk_size, CHUNK_TYPE_3);
}

GstBuffer *
gst_rtmp_chunk_stream_serialize_all (GstRtmpChunkStream * cstream,
    GstBuffer * buffer, guint32 chunk_size)
{
  GstBuffer *outbuf, *nextbuf;

  outbuf = gst_rtmp_chunk_stream_serialize_start (cstream, buffer, chunk_size);
  nextbuf = gst_rtmp_chunk_stream_serialize_next (cstream, chunk_size);

  while (nextbuf) {
    outbuf = gst_buffer_append (outbuf, nextbuf);
    nextbuf = gst_rtmp_chunk_stream_serialize_next (cstream, chunk_size);
  }

  return outbuf;
}

GstRtmpChunkStreams *
gst_rtmp_chunk_streams_new (void)
{
  GstRtmpChunkStreams *cstreams;

  init_debug ();

  cstreams = g_slice_new (GstRtmpChunkStreams);
  cstreams->array = g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkStream));
  g_array_set_clear_func (cstreams->array,
      (GDestroyNotify) gst_rtmp_chunk_stream_clear);
  return cstreams;
}

void
gst_rtmp_chunk_streams_free (gpointer ptr)
{
  GstRtmpChunkStreams *cstreams = ptr;
  g_clear_pointer (&cstreams->array, g_array_unref);
  g_slice_free (GstRtmpChunkStreams, cstreams);
}

GstRtmpChunkStream *
gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id)
{
  GArray *array;
  GstRtmpChunkStream *entry;
  guint i;

  g_return_val_if_fail (cstreams, NULL);
  g_return_val_if_fail (id > CHUNK_BYTE_THREEBYTE, NULL);
  g_return_val_if_fail (id <= CHUNK_STREAM_MAX_THREEBYTE, NULL);

  array = cstreams->array;

  for (i = 0; i < array->len; i++) {
    entry = &g_array_index (array, GstRtmpChunkStream, i);
    if (entry->id == id) {
      GST_TRACE ("Obtaining chunk stream %" G_GUINT32_FORMAT, id);
      return entry;
    }
  }

  GST_DEBUG ("Allocating chunk stream %" G_GUINT32_FORMAT, id);

  g_array_set_size (array, i + 1);
  entry = &g_array_index (array, GstRtmpChunkStream, i);
  entry->id = id;
  return entry;
}