/* GStreamer
 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
 * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
 * Copyright (C) 2008 Vincent Penquerc'h <ogg.k.ogg.k@googlemail.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:element-kateparse
 * @title: kateparse
 * @short_description: parses kate streams
 * @see_also: katedec, vorbisparse, oggdemux, theoraparse
 *
 * The kateparse element will parse the header packets of the Kate
 * stream and put them as the streamheader in the caps. This is used in the
 * multifdsink case where you want to stream live kate streams to multiple
 * clients, each client has to receive the streamheaders first before they can
 * consume the kate packets.
 *
 * This element also makes sure that the buffers that it pushes out are properly
 * timestamped and that their offset and offset_end are set. The buffers that
 * kateparse outputs have all of the metadata that oggmux expects to receive,
 * which allows you to (for example) remux an ogg/kate file.
 *
 * ## Example pipelines
 *
 * |[
 * gst-launch-1.0 -v filesrc location=kate.ogg ! oggdemux ! kateparse ! fakesink
 * ]|
 * This pipeline shows that the streamheader is set in the caps, and that each
 * buffer has the timestamp, duration, offset, and offset_end set.
 *
 * |[
 * gst-launch-1.0 filesrc location=kate.ogg ! oggdemux ! kateparse \
 *            ! oggmux ! filesink location=kate-remuxed.ogg
 * ]|
 * This pipeline shows remuxing. kate-remuxed.ogg might not be exactly the same
 * as kate.ogg, but they should produce exactly the same decoded data.
 *
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "gstkate.h"
#include "gstkateutil.h"
#include "gstkateparse.h"

GST_DEBUG_CATEGORY_EXTERN (gst_kateparse_debug);
#define GST_CAT_DEFAULT gst_kateparse_debug

static GstStaticPadTemplate gst_kate_parse_sink_factory =
    GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
    );

static GstStaticPadTemplate gst_kate_parse_src_factory =
    GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
    );

#define gst_kate_parse_parent_class parent_class
G_DEFINE_TYPE (GstKateParse, gst_kate_parse, GST_TYPE_ELEMENT);

static GstFlowReturn gst_kate_parse_chain (GstPad * pad, GstObject * parent,
    GstBuffer * buffer);
static GstStateChangeReturn gst_kate_parse_change_state (GstElement * element,
    GstStateChange transition);
static gboolean gst_kate_parse_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
static gboolean gst_kate_parse_src_query (GstPad * pad, GstObject * parent,
    GstQuery * query);
#if 0
static gboolean gst_kate_parse_convert (GstPad * pad,
    GstFormat src_format, gint64 src_value,
    GstFormat * dest_format, gint64 * dest_value);
#endif
static GstFlowReturn gst_kate_parse_parse_packet (GstKateParse * parse,
    GstBuffer * buf);

static void
gst_kate_parse_class_init (GstKateParseClass * klass)
{
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

  gstelement_class->change_state = gst_kate_parse_change_state;

  gst_element_class_add_static_pad_template (gstelement_class,
      &gst_kate_parse_src_factory);
  gst_element_class_add_static_pad_template (gstelement_class,
      &gst_kate_parse_sink_factory);

  gst_element_class_set_static_metadata (gstelement_class, "Kate stream parser",
      "Codec/Parser/Subtitle",
      "parse raw kate streams",
      "Vincent Penquerc'h <ogg.k.ogg.k at googlemail dot com>");

  klass->parse_packet = GST_DEBUG_FUNCPTR (gst_kate_parse_parse_packet);
}

static void
gst_kate_parse_init (GstKateParse * parse)
{
  parse->sinkpad =
      gst_pad_new_from_static_template (&gst_kate_parse_sink_factory, "sink");
  gst_pad_set_chain_function (parse->sinkpad,
      GST_DEBUG_FUNCPTR (gst_kate_parse_chain));
  gst_pad_set_event_function (parse->sinkpad,
      GST_DEBUG_FUNCPTR (gst_kate_parse_sink_event));
  gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);

  parse->srcpad =
      gst_pad_new_from_static_template (&gst_kate_parse_src_factory, "src");
  gst_pad_set_query_function (parse->srcpad,
      GST_DEBUG_FUNCPTR (gst_kate_parse_src_query));
  gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
}

static void
gst_kate_parse_drain_event_queue (GstKateParse * parse)
{
  while (parse->event_queue->length) {
    GstEvent *event;

    event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
    gst_pad_event_default (parse->sinkpad, NULL, event);
  }
}

static GstFlowReturn
gst_kate_parse_push_headers (GstKateParse * parse)
{
  /* mark and put on caps */
  GstCaps *caps;
  GstBuffer *outbuf;
  kate_packet packet;
  GList *headers, *outbuf_list = NULL;
  int ret;
  gboolean res;

  /* get the headers into the caps, passing them to kate as we go */
  caps =
      gst_kate_util_set_header_on_caps (&parse->element,
      gst_pad_get_current_caps (parse->sinkpad), parse->streamheader);

  if (G_UNLIKELY (!caps)) {
    GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
        ("Failed to set headers on caps"));
    return GST_FLOW_ERROR;
  }

  GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
  res = gst_pad_set_caps (parse->srcpad, caps);
  gst_caps_unref (caps);
  if (G_UNLIKELY (!res)) {
    GST_WARNING_OBJECT (parse->srcpad, "Failed to set caps on source pad");
    return GST_FLOW_NOT_NEGOTIATED;
  }

  headers = parse->streamheader;
  while (headers) {
    GstMapInfo info;

    outbuf = GST_BUFFER_CAST (headers->data);

    if (!gst_buffer_map (outbuf, &info, GST_MAP_READ)) {
      GST_WARNING_OBJECT (outbuf, "Failed to map buffer");
      continue;
    }

    kate_packet_wrap (&packet, info.size, info.data);
    ret = kate_decode_headerin (&parse->ki, &parse->kc, &packet);
    if (G_UNLIKELY (ret < 0)) {
      GST_WARNING_OBJECT (parse, "Failed to decode header: %s",
          gst_kate_util_get_error_message (ret));
    }
    gst_buffer_unmap (outbuf, &info);
    /* takes ownership of outbuf, which was previously in parse->streamheader */
    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_HEADER);
    outbuf_list = g_list_append (outbuf_list, outbuf);
    headers = headers->next;
  }

  /* first process queued events */
  gst_kate_parse_drain_event_queue (parse);

  /* push out buffers, ignoring return value... */
  headers = outbuf_list;
  while (headers) {
    outbuf = GST_BUFFER_CAST (headers->data);
    gst_pad_push (parse->srcpad, outbuf);
    headers = headers->next;
  }

  g_list_free (outbuf_list);
  g_list_free (parse->streamheader);
  parse->streamheader = NULL;

  parse->streamheader_sent = TRUE;

  return GST_FLOW_OK;
}

static void
gst_kate_parse_clear_queue (GstKateParse * parse)
{
  GST_DEBUG_OBJECT (parse, "Clearing queue");
  while (parse->buffer_queue->length) {
    GstBuffer *buf;

    buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
    gst_buffer_unref (buf);
  }
  while (parse->event_queue->length) {
    GstEvent *event;

    event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
    gst_event_unref (event);
  }
}

static GstFlowReturn
gst_kate_parse_push_buffer (GstKateParse * parse, GstBuffer * buf,
    gint64 granulepos)
{
  GST_LOG_OBJECT (parse, "granulepos %16" G_GINT64_MODIFIER "x", granulepos);
  if (granulepos < 0) {
    /* packets coming not from Ogg won't have a granpos in the offset end,
       so we have to synthesize one here - only problem is we don't know
       the backlink - pretend there's none for now */
    GST_INFO_OBJECT (parse, "No granulepos on buffer, synthesizing one");
    granulepos =
        kate_duration_granule (&parse->ki,
        GST_BUFFER_TIMESTAMP (buf) /
        (double) GST_SECOND) << kate_granule_shift (&parse->ki);
  }
  GST_BUFFER_OFFSET (buf) =
      kate_granule_time (&parse->ki, granulepos) * GST_SECOND;
  GST_BUFFER_OFFSET_END (buf) = granulepos;
  GST_BUFFER_TIMESTAMP (buf) = GST_BUFFER_OFFSET (buf);

  return gst_pad_push (parse->srcpad, buf);
}

static GstFlowReturn
gst_kate_parse_drain_queue_prematurely (GstKateParse * parse)
{
  GstFlowReturn ret = GST_FLOW_OK;

  /* got an EOS event, make sure to push out any buffers that were in the queue
   * -- won't normally be the case, but this catches the
   * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
   * stream. */

  /* if we got EOS before any buffers came, go ahead and push the other events
   * first */
  gst_kate_parse_drain_event_queue (parse);

  while (!g_queue_is_empty (parse->buffer_queue)) {
    GstBuffer *buf;
    gint64 granpos;

    buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));

    granpos = GST_BUFFER_OFFSET_END (buf);
    ret = gst_kate_parse_push_buffer (parse, buf, granpos);

    if (ret != GST_FLOW_OK)
      goto done;
  }

  g_assert (g_queue_is_empty (parse->buffer_queue));

done:
  return ret;
}

static GstFlowReturn
gst_kate_parse_drain_queue (GstKateParse * parse, gint64 granulepos)
{
  GstFlowReturn ret = GST_FLOW_OK;

  if (!g_queue_is_empty (parse->buffer_queue)) {
    GstBuffer *buf;
    buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
    ret = gst_kate_parse_push_buffer (parse, buf, granulepos);

    if (ret != GST_FLOW_OK)
      goto done;
  }
  g_assert (g_queue_is_empty (parse->buffer_queue));

done:
  return ret;
}

static GstFlowReturn
gst_kate_parse_queue_buffer (GstKateParse * parse, GstBuffer * buf)
{
  GstFlowReturn ret = GST_FLOW_OK;
  gint64 granpos;

  buf = gst_buffer_make_writable (buf);

  /* oggdemux stores the granule pos in the offset end */
  granpos = GST_BUFFER_OFFSET_END (buf);
  GST_LOG_OBJECT (parse, "granpos %16" G_GINT64_MODIFIER "x", granpos);
  g_queue_push_tail (parse->buffer_queue, buf);

#if 1
  /* if getting buffers from matroska, we won't have a granpos here... */
  //if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
  ret = gst_kate_parse_drain_queue (parse, granpos);
  //}
#else
  if (granpos >= 0) {
    ret = gst_kate_parse_drain_queue (parse, granpos);
  } else {
    GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
        ("Bad granulepos %" G_GINT64_FORMAT, granpos));
    ret = GST_FLOW_ERROR;
  }
#endif

  return ret;
}

static GstFlowReturn
gst_kate_parse_parse_packet (GstKateParse * parse, GstBuffer * buf)
{
  GstFlowReturn ret = GST_FLOW_OK;
  guint8 header[1];
  gsize size;

  g_assert (parse);

  parse->packetno++;

  size = gst_buffer_extract (buf, 0, header, 1);

  GST_LOG_OBJECT (parse, "Got packet %02x, %" G_GSIZE_FORMAT " bytes",
      size ? header[0] : -1, gst_buffer_get_size (buf));

  if (size > 0 && header[0] & 0x80) {
    GST_DEBUG_OBJECT (parse, "Found header %02x", header[0]);
    /* if 0x80 is set, it's streamheader,
     * so put it on the streamheader list and return */
    parse->streamheader = g_list_append (parse->streamheader, buf);
    ret = GST_FLOW_OK;
  } else {
    if (!parse->streamheader_sent) {
      GST_DEBUG_OBJECT (parse, "Found non header, pushing headers seen so far");
      ret = gst_kate_parse_push_headers (parse);
    }

    if (ret == GST_FLOW_OK) {
      ret = gst_kate_parse_queue_buffer (parse, buf);
    }
  }

  return ret;
}

static GstFlowReturn
gst_kate_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
  GstKateParseClass *klass;
  GstKateParse *parse;

  parse = GST_KATE_PARSE (parent);
  klass = GST_KATE_PARSE_CLASS (G_OBJECT_GET_CLASS (parse));

  g_assert (klass->parse_packet != NULL);

  if (G_UNLIKELY (!gst_pad_has_current_caps (pad)))
    return GST_FLOW_NOT_NEGOTIATED;

  return klass->parse_packet (parse, buffer);
}

static gboolean
gst_kate_parse_queue_event (GstKateParse * parse, GstEvent * event)
{
  GstFlowReturn ret = TRUE;

  g_queue_push_tail (parse->event_queue, event);

  return ret;
}

static gboolean
gst_kate_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
  gboolean ret;
  GstKateParse *parse;

  parse = GST_KATE_PARSE (parent);

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_FLUSH_STOP:
      gst_kate_parse_clear_queue (parse);
      ret = gst_pad_event_default (pad, parent, event);
      break;
    case GST_EVENT_EOS:
      if (!parse->streamheader_sent) {
        GST_DEBUG_OBJECT (parse, "Got EOS, pushing headers seen so far");
        ret = gst_kate_parse_push_headers (parse);
        if (ret != GST_FLOW_OK)
          break;
      }
      gst_kate_parse_drain_queue_prematurely (parse);
      ret = gst_pad_event_default (pad, parent, event);
      break;
    default:
      if (!parse->streamheader_sent && GST_EVENT_IS_SERIALIZED (event)
          && GST_EVENT_TYPE (event) > GST_EVENT_CAPS)
        ret = gst_kate_parse_queue_event (parse, event);
      else
        ret = gst_pad_event_default (pad, parent, event);
      break;
  }

  return ret;
}

#if 0
static gboolean
gst_kate_parse_convert (GstPad * pad,
    GstFormat src_format, gint64 src_value,
    GstFormat * dest_format, gint64 * dest_value)
{
  gboolean res = TRUE;
  GstKateParse *parse;

  parse = GST_KATE_PARSE (GST_PAD_PARENT (pad));

  /* fixme: assumes atomic access to lots of instance variables modified from
   * the streaming thread, including 64-bit variables */

  if (!parse->streamheader_sent)
    return FALSE;

  if (src_format == *dest_format) {
    *dest_value = src_value;
    return TRUE;
  }

  if (parse->sinkpad == pad &&
      (src_format == GST_FORMAT_BYTES || *dest_format == GST_FORMAT_BYTES))
    return FALSE;

  switch (src_format) {
    case GST_FORMAT_TIME:
      switch (*dest_format) {
        default:
          res = FALSE;
      }
      break;
    case GST_FORMAT_DEFAULT:
      switch (*dest_format) {
        case GST_FORMAT_TIME:
          *dest_value = kate_granule_time (&parse->ki, src_value) * GST_SECOND;
          break;
        default:
          res = FALSE;
      }
      break;
    default:
      res = FALSE;
  }

  return res;
}
#endif

static gboolean
gst_kate_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
#if 1
  // TODO
  GST_WARNING ("gst_kate_parse_src_query");
  return FALSE;
#else
  gint64 granulepos;
  GstKateParse *parse;
  gboolean res = FALSE;

  parse = GST_KATE_PARSE (parent);

  switch (GST_QUERY_TYPE (query)) {
    case GST_QUERY_POSITION:
    {
      GstFormat format;
      gint64 value;

      granulepos = parse->prev_granulepos;

      gst_query_parse_position (query, &format, NULL);

      /* and convert to the final format */
      if (!(res =
              gst_kate_parse_convert (pad, GST_FORMAT_DEFAULT, granulepos,
                  &format, &value)))
        goto error;

      /* fixme: support segments
         value = (value - parse->segment_start) + parse->segment_time;
       */

      gst_query_set_position (query, format, value);

      GST_LOG_OBJECT (parse, "query %p: peer returned granulepos: %"
          G_GUINT64_FORMAT " - we return %" G_GUINT64_FORMAT " (format %u)",
          query, granulepos, value, format);

      break;
    }
    case GST_QUERY_DURATION:
    {
      /* fixme: not threadsafe */
      /* query peer for total length */
      if (!gst_pad_is_linked (parse->sinkpad)) {
        GST_WARNING_OBJECT (parse, "sink pad %" GST_PTR_FORMAT " is not linked",
            parse->sinkpad);
        goto error;
      }
      if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
        goto error;
      break;
    }
    case GST_QUERY_CONVERT:
    {
      GstFormat src_fmt, dest_fmt;
      gint64 src_val, dest_val;

      gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
      if (!(res =
              gst_kate_parse_convert (pad, src_fmt, src_val, &dest_fmt,
                  &dest_val)))
        goto error;
      gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
      break;
    }
    default:
      res = gst_pad_query_default (pad, query);
      break;
  }
  return res;

error:
  {
    GST_WARNING_OBJECT (parse, "error handling query");
    return res;
  }
#endif
}

static void
gst_kate_parse_free_stream_headers (GstKateParse * parse)
{
  while (parse->streamheader != NULL) {
    gst_buffer_unref (GST_BUFFER (parse->streamheader->data));
    parse->streamheader = g_list_delete_link (parse->streamheader,
        parse->streamheader);
  }
}

static GstStateChangeReturn
gst_kate_parse_change_state (GstElement * element, GstStateChange transition)
{
  GstKateParse *parse = GST_KATE_PARSE (element);
  GstStateChangeReturn ret;

  switch (transition) {
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      kate_info_init (&parse->ki);
      kate_comment_init (&parse->kc);
      parse->packetno = 0;
      parse->streamheader_sent = FALSE;
      parse->streamheader = NULL;
      parse->buffer_queue = g_queue_new ();
      parse->event_queue = g_queue_new ();
      break;
    default:
      break;
  }

  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);

  switch (transition) {
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      kate_info_clear (&parse->ki);
      kate_comment_clear (&parse->kc);

      gst_kate_parse_clear_queue (parse);
      g_queue_free (parse->buffer_queue);
      parse->buffer_queue = NULL;
      g_queue_free (parse->event_queue);
      parse->event_queue = NULL;
      gst_kate_parse_free_stream_headers (parse);
      break;

    default:
      break;
  }

  return ret;
}