/*
 * Copyright (c) 2015, Collabora Ltd.
 *
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this
 * list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice, this
 * list of conditions and the following disclaimer in the documentation and/or other
 * materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
 * OF SUCH DAMAGE.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsctpenc.h"

#include <gst/sctp/sctpsendmeta.h>
#include <stdio.h>

GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
#define GST_CAT_DEFAULT gst_sctp_enc_debug_category

#define gst_sctp_enc_parent_class parent_class
G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
GST_ELEMENT_REGISTER_DEFINE (sctpenc, "sctpenc", GST_RANK_NONE,
    GST_TYPE_SCTP_ENC);

static GstStaticPadTemplate sink_template =
GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
    GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate src_template =
GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
    GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));

enum
{
  SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
  SIGNAL_GET_STREAM_BYTES_SENT,
  NUM_SIGNALS
};

static guint signals[NUM_SIGNALS];

enum
{
  PROP_0,

  PROP_GST_SCTP_ASSOCIATION_ID,
  PROP_REMOTE_SCTP_PORT,
  PROP_USE_SOCK_STREAM,

  NUM_PROPERTIES
};

static GParamSpec *properties[NUM_PROPERTIES];

#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
#define DEFAULT_REMOTE_SCTP_PORT 0
#define DEFAULT_GST_SCTP_ORDERED TRUE
#define DEFAULT_SCTP_PPID 1
#define DEFAULT_USE_SOCK_STREAM FALSE

#define BUFFER_FULL_SLEEP_TIME 100000

GType gst_sctp_enc_pad_get_type (void);

#define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
#define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
#define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
#define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
#define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))

typedef struct _GstSctpEncPad GstSctpEncPad;
typedef GstPadClass GstSctpEncPadClass;

struct _GstSctpEncPad
{
  GstPad parent;

  guint16 stream_id;
  gboolean ordered;
  guint32 ppid;
  GstSctpAssociationPartialReliability reliability;
  guint32 reliability_param;

  guint64 bytes_sent;

  GMutex lock;
  GCond cond;
  gboolean flushing;
  gboolean clear_to_send;
};

G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);

static void
gst_sctp_enc_pad_finalize (GObject * object)
{
  GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);

  g_cond_clear (&self->cond);
  g_mutex_clear (&self->lock);

  G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
}

static void
gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;

  gobject_class->finalize = gst_sctp_enc_pad_finalize;
}

static void
gst_sctp_enc_pad_init (GstSctpEncPad * self)
{
  g_mutex_init (&self->lock);
  g_cond_init (&self->cond);
  self->flushing = FALSE;
  self->clear_to_send = FALSE;
}

static void gst_sctp_enc_finalize (GObject * object);
static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);
static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
    GstStateChange transition);
static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
    GstPadTemplate * template, const gchar * name, const GstCaps * caps);
static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
static void gst_sctp_enc_srcpad_loop (GstPad * pad);
static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
    GstBuffer * buffer);
static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
static void on_sctp_association_state_changed (GstSctpAssociation *
    sctp_association, GParamSpec * pspec, GstSctpEnc * self);

static gboolean configure_association (GstSctpEnc * self);
static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
    const guint8 * buf, gsize length, gpointer user_data);
static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
static void sctpenc_cleanup (GstSctpEnc * self);
static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
    GstSctpAssociationPartialReliability * reliability,
    guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);

static void
gst_sctp_enc_class_init (GstSctpEncClass * klass)
{
  GObjectClass *gobject_class;
  GstElementClass *element_class;

  gobject_class = (GObjectClass *) klass;
  element_class = (GstElementClass *) klass;

  GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
      "sctpenc", 0, "debug category for sctpenc element");

  gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
      gst_static_pad_template_get (&src_template));
  gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
      gst_static_pad_template_get (&sink_template));

  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
  gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
  gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);

  element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
  element_class->request_new_pad =
      GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
  element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);

  properties[PROP_GST_SCTP_ASSOCIATION_ID] =
      g_param_spec_uint ("sctp-association-id",
      "SCTP Association ID",
      "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
      "This value must be set before any pads are requested.",
      0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);

  properties[PROP_REMOTE_SCTP_PORT] =
      g_param_spec_uint ("remote-sctp-port",
      "Remote SCTP port",
      "Sctp remote sctp port for the sctp association. The local port is configured via the "
      "GstSctpDec element.",
      0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
      G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);

  properties[PROP_USE_SOCK_STREAM] =
      g_param_spec_boolean ("use-sock-stream",
      "Use sock-stream",
      "When set to TRUE, a sequenced, reliable, connection-based connection is used."
      "When TRUE the partial reliability parameters of the channel are ignored.",
      DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);

  g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);

  signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
      g_signal_new ("sctp-association-established",
      G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
      NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);

  signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
      G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
      G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
      NULL, G_TYPE_UINT64, 1, G_TYPE_UINT);

  klass->on_get_stream_bytes_sent =
      GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);

  gst_element_class_set_static_metadata (element_class,
      "SCTP Encoder",
      "Encoder/Network/SCTP",
      "Encodes packets with SCTP",
      "George Kiagiadakis <george.kiagiadakis@collabora.com>");
}

static gboolean
data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
    guint64 time, gpointer user_data)
{
  /* TODO: When are we considered full? */
  return FALSE;
}

static void
data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
{
}

static void
data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
{
}

static void
gst_sctp_enc_init (GstSctpEnc * self)
{
  self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
  self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;

  self->sctp_association = NULL;
  self->outbound_sctp_packet_queue =
      gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
      data_queue_empty_cb, NULL);

  self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
  gst_pad_set_event_function (self->src_pad,
      GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
  gst_element_add_pad (GST_ELEMENT (self), self->src_pad);

  g_queue_init (&self->pending_pads);
  self->src_ret = GST_FLOW_FLUSHING;
}

static void
gst_sctp_enc_finalize (GObject * object)
{
  GstSctpEnc *self = GST_SCTP_ENC (object);

  g_queue_clear (&self->pending_pads);
  gst_object_unref (self->outbound_sctp_packet_queue);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

static void
gst_sctp_enc_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstSctpEnc *self = GST_SCTP_ENC (object);

  switch (prop_id) {
    case PROP_GST_SCTP_ASSOCIATION_ID:
      self->sctp_association_id = g_value_get_uint (value);
      break;
    case PROP_REMOTE_SCTP_PORT:
      self->remote_sctp_port = g_value_get_uint (value);
      break;
    case PROP_USE_SOCK_STREAM:
      self->use_sock_stream = g_value_get_boolean (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
      break;
  }
}

static void
gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
{
  GstSctpEnc *self = GST_SCTP_ENC (object);

  switch (prop_id) {
    case PROP_GST_SCTP_ASSOCIATION_ID:
      g_value_set_uint (value, self->sctp_association_id);
      break;
    case PROP_REMOTE_SCTP_PORT:
      g_value_set_uint (value, self->remote_sctp_port);
      break;
    case PROP_USE_SOCK_STREAM:
      g_value_set_boolean (value, self->use_sock_stream);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
      break;
  }
}

static GstStateChangeReturn
gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
{
  GstSctpEnc *self = GST_SCTP_ENC (element);
  GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
  gboolean res = TRUE;

  switch (transition) {
    case GST_STATE_CHANGE_NULL_TO_READY:
      break;
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      self->need_segment = self->need_stream_start_caps = TRUE;
      self->src_ret = GST_FLOW_OK;
      gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
      res = configure_association (self);
      break;
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
      break;
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      stop_srcpad_task (self->src_pad, self);
      self->src_ret = GST_FLOW_FLUSHING;
      break;
    case GST_STATE_CHANGE_READY_TO_NULL:
      break;
    default:
      break;
  }

  if (res)
    ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);

  switch (transition) {
    case GST_STATE_CHANGE_NULL_TO_READY:
      break;
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      gst_pad_start_task (self->src_pad,
          (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
      break;
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
      break;
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      sctpenc_cleanup (self);
      break;
    case GST_STATE_CHANGE_READY_TO_NULL:
      break;
    default:
      break;
  }

  return ret;
}

static GstPad *
gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
    const gchar * new_pad_name, const GstCaps * caps)
{
  GstSctpEnc *self = GST_SCTP_ENC (element);
  GstPad *new_pad = NULL;
  GstSctpEncPad *sctpenc_pad;
  guint32 stream_id;
  gint state;
  guint32 new_ppid;
  gboolean is_new_ppid;

  g_object_get (self->sctp_association, "state", &state, NULL);

  if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
    GST_ERROR_OBJECT
        (self,
        "The SCTP association must be established before a new stream can be created");
    goto invalid_state;
  }

  if (!template)
    goto invalid_parameter;

  /* 65535 is not a valid stream id */
  if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
      || stream_id > 65534) {
    GST_ERROR_OBJECT
        (self, "Invalid sink pad name %s", GST_STR_NULL (new_pad_name));
    goto invalid_parameter;
  }

  new_pad = gst_element_get_static_pad (element, new_pad_name);
  if (new_pad) {
    gst_object_unref (new_pad);
    new_pad = NULL;
    GST_ERROR_OBJECT (self, "Pad %s already exists", new_pad_name);
    goto invalid_parameter;
  }

  GST_DEBUG_OBJECT (self, "Creating new pad %s", new_pad_name);
  new_pad =
      g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
      template->direction, "template", template, NULL);
  gst_pad_set_chain_function (new_pad,
      GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
  gst_pad_set_event_function (new_pad,
      GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));

  sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
  sctpenc_pad->stream_id = stream_id;
  sctpenc_pad->ppid = DEFAULT_SCTP_PPID;

  if (caps) {
    GST_DEBUG_OBJECT (self, "Pad %s requested with caps %" GST_PTR_FORMAT,
        new_pad_name, caps);
    get_config_from_caps (caps, &sctpenc_pad->ordered,
        &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
        &is_new_ppid);

    if (is_new_ppid)
      sctpenc_pad->ppid = new_ppid;
  }

  sctpenc_pad->flushing = FALSE;

  if (!gst_pad_set_active (new_pad, TRUE))
    goto error_cleanup;

  if (!gst_element_add_pad (element, new_pad))
    goto error_add_pad;

invalid_state:
invalid_parameter:
  return new_pad;
error_add_pad:
  gst_pad_set_active (new_pad, FALSE);
error_cleanup:
  gst_object_unref (new_pad);
  return NULL;
}

static void
gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
{
  GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
  GstSctpEnc *self;
  guint stream_id = 0;

  self = GST_SCTP_ENC (element);

  g_mutex_lock (&sctpenc_pad->lock);
  sctpenc_pad->flushing = TRUE;
  g_cond_signal (&sctpenc_pad->cond);
  g_mutex_unlock (&sctpenc_pad->lock);

  stream_id = sctpenc_pad->stream_id;
  gst_pad_set_active (pad, FALSE);

  if (self->sctp_association)
    gst_sctp_association_reset_stream (self->sctp_association, stream_id);

  GST_PAD_STREAM_LOCK (pad);
  if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (element)))
    gst_element_remove_pad (element, pad);
  GST_PAD_STREAM_UNLOCK (pad);
}

static void
gst_sctp_enc_srcpad_loop (GstPad * pad)
{
  GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
  GstFlowReturn flow_ret;
  GstDataQueueItem *item;

  if (self->need_stream_start_caps) {
    gchar s_id[32];
    GstCaps *caps;

    g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
    gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));

    caps = gst_caps_new_empty_simple ("application/x-sctp");
    gst_pad_set_caps (self->src_pad, caps);
    gst_caps_unref (caps);

    self->need_stream_start_caps = FALSE;
  }

  if (self->need_segment) {
    GstSegment segment;

    gst_segment_init (&segment, GST_FORMAT_BYTES);
    gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));

    self->need_segment = FALSE;
  }

  if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
    GstBuffer *buffer = GST_BUFFER (item->object);

    GST_DEBUG_OBJECT (self, "Forwarding buffer %" GST_PTR_FORMAT, buffer);

    flow_ret = gst_pad_push (self->src_pad, buffer);
    item->object = NULL;

    GST_OBJECT_LOCK (self);
    self->src_ret = flow_ret;
    GST_OBJECT_UNLOCK (self);

    if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
            || flow_ret == GST_FLOW_NOT_LINKED)) {
      GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
          gst_flow_get_name (flow_ret));
    } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
      GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
          gst_flow_get_name (flow_ret));
    }

    if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
      GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
      gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
      gst_data_queue_flush (self->outbound_sctp_packet_queue);
      gst_pad_pause_task (pad);
    }

    item->destroy (item);
  } else {
    GST_OBJECT_LOCK (self);
    self->src_ret = GST_FLOW_FLUSHING;
    GST_OBJECT_UNLOCK (self);

    GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
    gst_pad_pause_task (pad);
  }
}

static GstFlowReturn
gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
  GstSctpEnc *self = GST_SCTP_ENC (parent);
  GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
  GstSctpEncPad *sctpenc_pad_next = NULL;
  GstMapInfo map;
  guint32 ppid;
  gboolean ordered;
  GstSctpAssociationPartialReliability pr;
  guint32 pr_param;
  gpointer state = NULL;
  GstMeta *meta;
  const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
  GstFlowReturn flow_ret = GST_FLOW_ERROR;
  const guint8 *data;
  guint32 length;
  gboolean clear_to_send;

  GST_OBJECT_LOCK (self);
  if (self->src_ret != GST_FLOW_OK) {
    GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
        gst_flow_get_name (self->src_ret));
    flow_ret = self->src_ret;
    GST_OBJECT_UNLOCK (self);
    gst_buffer_unref (buffer);
    return flow_ret;
  }
  GST_OBJECT_UNLOCK (self);

  ppid = sctpenc_pad->ppid;
  ordered = sctpenc_pad->ordered;
  pr = sctpenc_pad->reliability;
  pr_param = sctpenc_pad->reliability_param;

  while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
    if (meta->info->api == meta_info->api) {
      GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;

      ppid = sctp_send_meta->ppid;
      ordered = sctp_send_meta->ordered;
      pr_param = sctp_send_meta->pr_param;
      switch (sctp_send_meta->pr) {
        case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
          pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
          break;
        case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
          pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
          break;
        case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
          pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
          break;
        case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
          pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
          break;
      }
      break;
    }
  }

  GST_DEBUG_OBJECT (pad,
      "Sending buffer %" GST_PTR_FORMAT
      " with ppid %u ordered %d pr %d pr_param %u", buffer, ppid, ordered, pr,
      pr_param);

  if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
    GST_ERROR_OBJECT (pad, "Could not map GstBuffer");
    goto error;
  }

  data = map.data;
  length = map.size;

  GST_OBJECT_LOCK (self);
  clear_to_send = g_queue_is_empty (&self->pending_pads);
  g_queue_push_tail (&self->pending_pads, sctpenc_pad);
  GST_OBJECT_UNLOCK (self);

  g_mutex_lock (&sctpenc_pad->lock);

  if (clear_to_send) {
    sctpenc_pad->clear_to_send = TRUE;
  }

  while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) {
    g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock);
  }

  while (!sctpenc_pad->flushing) {
    guint32 bytes_sent;

    g_mutex_unlock (&sctpenc_pad->lock);

    flow_ret =
        gst_sctp_association_send_data (self->sctp_association, data,
        length, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param,
        &bytes_sent);

    g_mutex_lock (&sctpenc_pad->lock);
    if (flow_ret != GST_FLOW_OK) {
      if (flow_ret != GST_FLOW_EOS) {
        GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
            ("Failed to send data"));
      }
      goto out;
    } else if (bytes_sent < length && !sctpenc_pad->flushing) {
      gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;

      GST_TRACE_OBJECT (pad, "Sent only %u of %u remaining bytes, waiting",
          bytes_sent, length);

      sctpenc_pad->bytes_sent += bytes_sent;
      data += bytes_sent;
      length -= bytes_sent;

      /* The buffer was probably full. Retry in a while */
      g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);

    } else if (bytes_sent == length) {
      GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
      sctpenc_pad->bytes_sent += bytes_sent;
      break;
    }
  }
  flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;

out:
  sctpenc_pad->clear_to_send = FALSE;
  g_mutex_unlock (&sctpenc_pad->lock);

  GST_OBJECT_LOCK (self);
  g_queue_remove (&self->pending_pads, sctpenc_pad);
  sctpenc_pad_next = g_queue_peek_head (&self->pending_pads);
  GST_OBJECT_UNLOCK (self);

  if (sctpenc_pad_next) {
    g_mutex_lock (&sctpenc_pad_next->lock);
    sctpenc_pad_next->clear_to_send = TRUE;
    g_cond_signal (&sctpenc_pad_next->cond);
    g_mutex_unlock (&sctpenc_pad_next->lock);
  }

  gst_buffer_unmap (buffer, &map);
error:
  gst_buffer_unref (buffer);
  return flow_ret;
}

static gboolean
gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
  GstSctpEnc *self = GST_SCTP_ENC (parent);
  GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
  gboolean ret, is_new_ppid;
  guint32 new_ppid;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_CAPS:{
      GstCaps *caps;

      gst_event_parse_caps (event, &caps);
      GST_DEBUG_OBJECT (pad, "Received new caps %" GST_PTR_FORMAT, caps);
      get_config_from_caps (caps, &sctpenc_pad->ordered,
          &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
          &is_new_ppid);
      if (is_new_ppid)
        sctpenc_pad->ppid = new_ppid;
      gst_event_unref (event);
      ret = TRUE;
      break;
    }
    case GST_EVENT_STREAM_START:
    case GST_EVENT_SEGMENT:
      /* Drop these, we create our own */
      ret = TRUE;
      gst_event_unref (event);
      break;
    case GST_EVENT_EOS:
      /* Drop this, we're never EOS until shut down */
      ret = TRUE;
      gst_event_unref (event);
      break;
    case GST_EVENT_FLUSH_START:
      g_mutex_lock (&sctpenc_pad->lock);
      sctpenc_pad->flushing = TRUE;
      g_cond_signal (&sctpenc_pad->cond);
      g_mutex_unlock (&sctpenc_pad->lock);

      ret = gst_pad_event_default (pad, parent, event);
      break;
    case GST_EVENT_FLUSH_STOP:
      sctpenc_pad->flushing = FALSE;
      GST_OBJECT_LOCK (self);
      self->src_ret = GST_FLOW_OK;
      GST_OBJECT_UNLOCK (self);
      ret = gst_pad_event_default (pad, parent, event);
      break;
    default:
      ret = gst_pad_event_default (pad, parent, event);
      break;
  }
  return ret;
}

static void
flush_sinkpad (const GValue * item, gpointer user_data)
{
  GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
  gboolean flush = GPOINTER_TO_INT (user_data);

  if (flush) {
    g_mutex_lock (&sctpenc_pad->lock);
    sctpenc_pad->flushing = TRUE;
    g_cond_signal (&sctpenc_pad->cond);
    g_mutex_unlock (&sctpenc_pad->lock);
  } else {
    sctpenc_pad->flushing = FALSE;
  }
}

static gboolean
gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
  GstSctpEnc *self = GST_SCTP_ENC (parent);
  gboolean ret;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_FLUSH_START:{
      GstIterator *it;

      gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
      gst_data_queue_flush (self->outbound_sctp_packet_queue);

      it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
      while (gst_iterator_foreach (it, flush_sinkpad,
              GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
        gst_iterator_resync (it);
      gst_iterator_free (it);

      ret = gst_pad_event_default (pad, parent, event);
      break;
    }
    case GST_EVENT_RECONFIGURE:
    case GST_EVENT_FLUSH_STOP:{
      GstIterator *it;

      it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
      while (gst_iterator_foreach (it, flush_sinkpad,
              GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
        gst_iterator_resync (it);
      gst_iterator_free (it);

      gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
      self->need_segment = TRUE;
      GST_OBJECT_LOCK (self);
      self->src_ret = GST_FLOW_OK;
      GST_OBJECT_UNLOCK (self);
      gst_pad_start_task (self->src_pad,
          (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);

      ret = gst_pad_event_default (pad, parent, event);
      break;
    }
    default:
      ret = gst_pad_event_default (pad, parent, event);
      break;
  }
  return ret;
}

static gboolean
configure_association (GstSctpEnc * self)
{
  gint state;

  self->sctp_association = gst_sctp_association_get (self->sctp_association_id);

  g_object_get (self->sctp_association, "state", &state, NULL);

  if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
    GST_WARNING_OBJECT (self,
        "Could not configure SCTP association. Association already in use!");
    g_object_unref (self->sctp_association);
    self->sctp_association = NULL;
    goto error;
  }

  self->signal_handler_state_changed =
      g_signal_connect_object (self->sctp_association, "notify::state",
      G_CALLBACK (on_sctp_association_state_changed), self, 0);

  g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
      "remote-port", G_BINDING_SYNC_CREATE);

  g_object_bind_property (self, "use-sock-stream", self->sctp_association,
      "use-sock-stream", G_BINDING_SYNC_CREATE);

  gst_sctp_association_set_on_packet_out (self->sctp_association,
      on_sctp_packet_out, gst_object_ref (self), gst_object_unref);

  return TRUE;
error:
  return FALSE;
}

static void
on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
    GParamSpec * pspec, GstSctpEnc * self)
{
  gint state;

  g_object_get (sctp_association, "state", &state, NULL);

  GST_DEBUG_OBJECT (self, "Association state changed to %d", state);

  switch (state) {
    case GST_SCTP_ASSOCIATION_STATE_NEW:
      break;
    case GST_SCTP_ASSOCIATION_STATE_READY:
      gst_sctp_association_start (sctp_association);
      break;
    case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
      break;
    case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
      g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
          TRUE);
      break;
    case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
    case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
      g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
          FALSE);
      break;
    case GST_SCTP_ASSOCIATION_STATE_ERROR:
      GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
          ("SCTP association went into error state"));
      break;
  }
}

static void
data_queue_item_free (GstDataQueueItem * item)
{
  if (item->object)
    gst_mini_object_unref (item->object);
  g_free (item);
}

static void
on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
    gsize length, gpointer user_data)
{
  GstSctpEnc *self = user_data;
  GstBuffer *gstbuf;
  GstDataQueueItem *item;
  GstSctpEncPad *sctpenc_pad;

  GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
      length);

  gstbuf = gst_buffer_new_memdup (buf, length);

  item = g_new0 (GstDataQueueItem, 1);
  item->object = GST_MINI_OBJECT (gstbuf);
  item->size = length;
  item->visible = TRUE;
  item->destroy = (GDestroyNotify) data_queue_item_free;

  if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
    item->destroy (item);
    GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
  }

  /* Wake up the oldest pad which is the one that needs to finish first */
  GST_OBJECT_LOCK (self);
  sctpenc_pad = g_queue_peek_head (&self->pending_pads);
  if (sctpenc_pad) {
    gst_object_ref (sctpenc_pad);

    GST_OBJECT_UNLOCK (self);

    g_mutex_lock (&sctpenc_pad->lock);
    g_cond_signal (&sctpenc_pad->cond);
    g_mutex_unlock (&sctpenc_pad->lock);

    gst_object_unref (sctpenc_pad);
  } else {
    GST_OBJECT_UNLOCK (self);
  }
}

static void
stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
{
  gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
  gst_data_queue_flush (self->outbound_sctp_packet_queue);
  gst_pad_stop_task (pad);
}

static void
remove_sinkpad (const GValue * item, gpointer user_data)
{
  GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
  GstSctpEnc *self = user_data;

  gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
}

static void
sctpenc_cleanup (GstSctpEnc * self)
{
  GstIterator *it;

  gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL,
      NULL);

  g_signal_handler_disconnect (self->sctp_association,
      self->signal_handler_state_changed);
  stop_srcpad_task (self->src_pad, self);
  gst_sctp_association_force_close (self->sctp_association);
  g_object_unref (self->sctp_association);
  self->sctp_association = NULL;

  it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
  while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
    gst_iterator_resync (it);
  gst_iterator_free (it);
  g_queue_clear (&self->pending_pads);
}

static void
get_config_from_caps (const GstCaps * caps, gboolean * ordered,
    GstSctpAssociationPartialReliability * reliability,
    guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
{
  GstStructure *s;
  guint i, n;

  *ordered = TRUE;
  *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
  *reliability_param = 0;
  *ppid_available = FALSE;

  n = gst_caps_get_size (caps);
  for (i = 0; i < n; i++) {
    s = gst_caps_get_structure (caps, i);
    if (gst_structure_has_field (s, "ordered")) {
      const GValue *v = gst_structure_get_value (s, "ordered");
      *ordered = g_value_get_boolean (v);
    }
    if (gst_structure_has_field (s, "partially-reliability")) {
      const GValue *v = gst_structure_get_value (s, "partially-reliability");
      const gchar *reliability_string = g_value_get_string (v);

      if (!g_strcmp0 (reliability_string, "none"))
        *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
      else if (!g_strcmp0 (reliability_string, "ttl"))
        *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
      else if (!g_strcmp0 (reliability_string, "buf"))
        *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
      else if (!g_strcmp0 (reliability_string, "rtx"))
        *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
    }
    if (gst_structure_has_field (s, "reliability-parameter")) {
      const GValue *v = gst_structure_get_value (s, "reliability-parameter");
      *reliability_param = g_value_get_uint (v);
    }
    if (gst_structure_has_field (s, "ppid")) {
      const GValue *v = gst_structure_get_value (s, "ppid");
      *ppid = g_value_get_uint (v);
      *ppid_available = TRUE;
    }
  }
}

static guint64
on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
{
  gchar *pad_name;
  GstPad *pad;
  GstSctpEncPad *sctpenc_pad;
  guint64 bytes_sent;

  pad_name = g_strdup_printf ("sink_%u", stream_id);
  pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
  g_free (pad_name);

  if (!pad) {
    GST_DEBUG_OBJECT (self,
        "Buffered amount requested on a stream that does not exist!");
    return 0;
  }

  sctpenc_pad = GST_SCTP_ENC_PAD (pad);

  g_mutex_lock (&sctpenc_pad->lock);
  bytes_sent = sctpenc_pad->bytes_sent;
  g_mutex_unlock (&sctpenc_pad->lock);

  gst_object_unref (sctpenc_pad);

  return bytes_sent;
}