gst/gdp/gstgdpdepay.c: Disable seeking.

Original commit message from CVS:
* gst/gdp/gstgdpdepay.c: (gst_gdp_depay_init),
(gst_gdp_depay_finalize), (gst_gdp_depay_sink_event),
(gst_gdp_depay_src_event), (gst_gdp_depay_chain),
(gst_gdp_depay_change_state):
Disable seeking.
Small cleanups.
Clear adapter on disconts.
Clear caps when going to READY instead of NULL
* gst/gdp/gstgdppay.c: (gst_gdp_pay_class_init),
(gst_gdp_pay_init), (gst_gdp_pay_finalize), (gst_gdp_pay_reset),
(gst_gdp_buffer_from_caps), (gst_gdp_pay_buffer_from_buffer),
(gst_gdp_buffer_from_event), (gst_gdp_pay_reset_streamheader),
(gst_gdp_queue_buffer), (gst_gdp_pay_chain),
(gst_gdp_pay_sink_event), (gst_gdp_pay_src_event),
(gst_gdp_pay_change_state):
* gst/gdp/gstgdppay.h:
Reset payloader when going to READY.
Fix leaked buffers in ->queue on push errors.
Disable seeking.
Code cleanups.
Create packetizer in _init, free in _finalize.
This commit is contained in:
Wim Taymans 2006-08-02 16:56:19 +00:00
parent 9b839af5f1
commit f7b0af5902
3 changed files with 381 additions and 200 deletions

View file

@ -56,7 +56,6 @@ GST_ELEMENT_DETAILS ("GDP Depayloader",
enum enum
{ {
PROP_0, PROP_0,
/* FILL ME */
}; };
static GstStaticPadTemplate gdp_depay_sink_template = static GstStaticPadTemplate gdp_depay_sink_template =
@ -82,6 +81,8 @@ GST_BOILERPLATE_FULL (GstGDPDepay, gst_gdp_depay, GstElement,
GST_TYPE_ELEMENT, _do_init); GST_TYPE_ELEMENT, _do_init);
static gboolean gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event);
static gboolean gst_gdp_depay_src_event (GstPad * pad, GstEvent * event);
static GstFlowReturn gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer);
static GstStateChangeReturn gst_gdp_depay_change_state (GstElement * static GstStateChangeReturn gst_gdp_depay_change_state (GstElement *
@ -129,6 +130,8 @@ gst_gdp_depay_init (GstGDPDepay * gdpdepay, GstGDPDepayClass * g_class)
gdpdepay->srcpad = gdpdepay->srcpad =
gst_pad_new_from_static_template (&gdp_depay_src_template, "src"); gst_pad_new_from_static_template (&gdp_depay_src_template, "src");
gst_pad_set_event_function (gdpdepay->srcpad,
GST_DEBUG_FUNCPTR (gst_gdp_depay_src_event));
/* our caps will always be decided by the incoming GDP caps buffers */ /* our caps will always be decided by the incoming GDP caps buffers */
gst_pad_use_fixed_caps (gdpdepay->srcpad); gst_pad_use_fixed_caps (gdpdepay->srcpad);
gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->srcpad); gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->srcpad);
@ -144,7 +147,6 @@ gst_gdp_depay_finalize (GObject * gobject)
this = GST_GDP_DEPAY (gobject); this = GST_GDP_DEPAY (gobject);
if (this->caps) if (this->caps)
gst_caps_unref (this->caps); gst_caps_unref (this->caps);
if (this->header)
g_free (this->header); g_free (this->header);
gst_adapter_clear (this->adapter); gst_adapter_clear (this->adapter);
g_object_unref (this->adapter); g_object_unref (this->adapter);
@ -152,11 +154,6 @@ gst_gdp_depay_finalize (GObject * gobject)
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject)); GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
} }
/* we ignore most events because the only events we want to output are those
* found in the gdp data stream.
* An exception is the EOS event, if we get that on the sinkpad, we certainly
* are not going to produce more data.
*/
static gboolean static gboolean
gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event) gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
{ {
@ -166,10 +163,20 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
this = GST_GDP_DEPAY (gst_pad_get_parent (pad)); this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS: case GST_EVENT_FLUSH_START:
case GST_EVENT_FLUSH_STOP:
/* forward flush start and stop */
res = gst_pad_push_event (this->srcpad, event); res = gst_pad_push_event (this->srcpad, event);
break; break;
case GST_EVENT_EOS:
/* after EOS, we don't expect to output anything anymore */
res = gst_pad_push_event (this->srcpad, event);
break;
case GST_EVENT_NEWSEGMENT:
case GST_EVENT_TAG:
case GST_EVENT_BUFFERSIZE:
default: default:
/* we unref most events as we take them from the datastream */
gst_event_unref (event); gst_event_unref (event);
break; break;
} }
@ -178,6 +185,32 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
return res; return res;
} }
static gboolean
gst_gdp_depay_src_event (GstPad * pad, GstEvent * event)
{
GstGDPDepay *this;
gboolean res = TRUE;
this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* we refuse seek for now. */
gst_event_unref (event);
res = FALSE;
break;
case GST_EVENT_QOS:
case GST_EVENT_NAVIGATION:
default:
/* everything else is passed */
res = gst_pad_push_event (this->sinkpad, event);
break;
}
gst_object_unref (this);
return res;
}
static GstFlowReturn static GstFlowReturn
gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer) gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
{ {
@ -186,108 +219,124 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GstCaps *caps; GstCaps *caps;
GstBuffer *buf; GstBuffer *buf;
GstEvent *event; GstEvent *event;
guint8 *header = NULL;
guint8 *payload = NULL;
guint available; guint available;
gboolean running = TRUE;
this = GST_GDP_DEPAY (gst_pad_get_parent (pad)); this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
/* On DISCONT, get rid of accumulated data. We assume a buffer after the
* DISCONT contains (part of) a new valid header, if not we error because we
* lost sync */
if (GST_BUFFER_IS_DISCONT (buffer)) {
gst_adapter_clear (this->adapter);
this->state = GST_GDP_DEPAY_STATE_HEADER;
}
gst_adapter_push (this->adapter, buffer); gst_adapter_push (this->adapter, buffer);
while (running) { while (TRUE) {
switch (this->state) { switch (this->state) {
case GST_GDP_DEPAY_STATE_HEADER: case GST_GDP_DEPAY_STATE_HEADER:
available = gst_adapter_available (this->adapter); {
if (available < GST_DP_HEADER_LENGTH) { guint8 *header;
running = FALSE;
break; /* collect a complete header, validate and store the header. Figure out
} * the payload length and switch to the PAYLOAD state */
available = gst_adapter_available (this->adapter);
if (available < GST_DP_HEADER_LENGTH)
goto done;
if (this->header)
g_free (this->header);
GST_LOG_OBJECT (this, "reading GDP header from adapter"); GST_LOG_OBJECT (this, "reading GDP header from adapter");
header = gst_adapter_take (this->adapter, GST_DP_HEADER_LENGTH); header = gst_adapter_take (this->adapter, GST_DP_HEADER_LENGTH);
if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) {
g_free (header);
goto header_validate_error; goto header_validate_error;
}
/* store types and payload length. Also store the header, which we need
* to make the payload. */
this->payload_length = gst_dp_header_payload_length (header); this->payload_length = gst_dp_header_payload_length (header);
this->payload_type = gst_dp_header_payload_type (header); this->payload_type = gst_dp_header_payload_type (header);
/* free previous header and store new one. */
g_free (this->header);
this->header = header; this->header = header;
GST_LOG_OBJECT (this, GST_LOG_OBJECT (this,
"read GDP header, payload size %d, switching to state PAYLOAD", "read GDP header, payload size %d, switching to state PAYLOAD",
this->payload_length); this->payload_length);
this->state = GST_GDP_DEPAY_STATE_PAYLOAD; this->state = GST_GDP_DEPAY_STATE_PAYLOAD;
break; break;
case GST_GDP_DEPAY_STATE_PAYLOAD:
available = gst_adapter_available (this->adapter);
if (available < this->payload_length) {
running = FALSE;
break;
} }
case GST_GDP_DEPAY_STATE_PAYLOAD:
{
/* in this state we wait for all the payload data to be available in the
* adapter. Then we switch to the state where we actually process the
* payload. */
available = gst_adapter_available (this->adapter);
if (available < this->payload_length)
goto done;
/* change state based on type */ /* change state based on type */
if (this->payload_type == GST_DP_PAYLOAD_BUFFER) { switch (this->payload_type) {
case GST_DP_PAYLOAD_BUFFER:
GST_LOG_OBJECT (this, "switching to state BUFFER"); GST_LOG_OBJECT (this, "switching to state BUFFER");
this->state = GST_GDP_DEPAY_STATE_BUFFER; this->state = GST_GDP_DEPAY_STATE_BUFFER;
} else if (this->payload_type == GST_DP_PAYLOAD_CAPS) { break;
case GST_DP_PAYLOAD_CAPS:
GST_LOG_OBJECT (this, "switching to state CAPS"); GST_LOG_OBJECT (this, "switching to state CAPS");
this->state = GST_GDP_DEPAY_STATE_CAPS; this->state = GST_GDP_DEPAY_STATE_CAPS;
} else if (this->payload_type >= GST_DP_PAYLOAD_EVENT_NONE) { break;
case GST_DP_PAYLOAD_EVENT_NONE:
GST_LOG_OBJECT (this, "switching to state EVENT"); GST_LOG_OBJECT (this, "switching to state EVENT");
this->state = GST_GDP_DEPAY_STATE_EVENT; this->state = GST_GDP_DEPAY_STATE_EVENT;
} else
goto wrong_type;
break; break;
default:
case GST_GDP_DEPAY_STATE_BUFFER: goto wrong_type;
if (!this->caps) {
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("Received a buffer without first receiving caps"));
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
} }
break;
}
case GST_GDP_DEPAY_STATE_BUFFER:
{
/* if we receive a buffer without caps first, we error out */
if (!this->caps)
goto no_caps;
GST_LOG_OBJECT (this, "reading GDP buffer from adapter"); GST_LOG_OBJECT (this, "reading GDP buffer from adapter");
buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, this->header); buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, this->header);
if (!buf) { if (!buf)
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL), goto buffer_failed;
("could not create buffer from GDP packet"));
ret = GST_FLOW_ERROR; /* now take the payload if there is any */
goto done; if (this->payload_length > 0) {
} guint8 *payload;
payload = gst_adapter_take (this->adapter, this->payload_length); payload = gst_adapter_take (this->adapter, this->payload_length);
memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length); memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
g_free (payload); g_free (payload);
payload = NULL; }
/* set caps and push */
gst_buffer_set_caps (buf, this->caps); gst_buffer_set_caps (buf, this->caps);
ret = gst_pad_push (this->srcpad, buf); ret = gst_pad_push (this->srcpad, buf);
if (ret != GST_FLOW_OK) { if (ret != GST_FLOW_OK)
GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d", goto push_error;
ret);
goto done;
}
GST_LOG_OBJECT (this, "switching to state HEADER"); GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER; this->state = GST_GDP_DEPAY_STATE_HEADER;
break; break;
}
case GST_GDP_DEPAY_STATE_CAPS: case GST_GDP_DEPAY_STATE_CAPS:
{
guint8 *payload;
/* take the payload of the caps */
GST_LOG_OBJECT (this, "reading GDP caps from adapter"); GST_LOG_OBJECT (this, "reading GDP caps from adapter");
payload = gst_adapter_take (this->adapter, this->payload_length); payload = gst_adapter_take (this->adapter, this->payload_length);
caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, this->header, caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, this->header,
payload); payload);
g_free (payload); g_free (payload);
payload = NULL; if (!caps)
if (!caps) { goto caps_failed;
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create caps from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
GST_DEBUG_OBJECT (this, "read caps %" GST_PTR_FORMAT, caps); GST_DEBUG_OBJECT (this, "read caps %" GST_PTR_FORMAT, caps);
gst_caps_replace (&(this->caps), caps); gst_caps_replace (&(this->caps), caps);
gst_pad_set_caps (this->srcpad, caps); gst_pad_set_caps (this->srcpad, caps);
@ -297,24 +346,24 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GST_LOG_OBJECT (this, "switching to state HEADER"); GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER; this->state = GST_GDP_DEPAY_STATE_HEADER;
break; break;
}
case GST_GDP_DEPAY_STATE_EVENT: case GST_GDP_DEPAY_STATE_EVENT:
{
guint8 *payload;
GST_LOG_OBJECT (this, "reading GDP event from adapter"); GST_LOG_OBJECT (this, "reading GDP event from adapter");
/* adapter doesn't like 0 length payload */ /* adapter doesn't like 0 length payload */
if (this->payload_length > 0) if (this->payload_length > 0)
payload = gst_adapter_take (this->adapter, this->payload_length); payload = gst_adapter_take (this->adapter, this->payload_length);
else
payload = NULL;
event = gst_dp_event_from_packet (GST_DP_HEADER_LENGTH, this->header, event = gst_dp_event_from_packet (GST_DP_HEADER_LENGTH, this->header,
payload); payload);
if (payload) {
g_free (payload); g_free (payload);
payload = NULL; if (!event)
} goto event_failed;
if (!event) {
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create event from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
GST_DEBUG_OBJECT (this, "sending deserialized event %p of type %s", GST_DEBUG_OBJECT (this, "sending deserialized event %p of type %s",
event, gst_event_type_get_name (event->type)); event, gst_event_type_get_name (event->type));
gst_pad_push_event (this->srcpad, event); gst_pad_push_event (this->srcpad, event);
@ -324,25 +373,60 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
break; break;
} }
} }
goto done; }
header_validate_error:
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header does not validate"));
g_free (header);
ret = GST_FLOW_ERROR;
goto done;
wrong_type:
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header is of wrong type"));
g_free (header);
ret = GST_FLOW_ERROR;
goto done;
done: done:
gst_object_unref (this); gst_object_unref (this);
return ret; return ret;
/* ERRORS */
header_validate_error:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header does not validate"));
ret = GST_FLOW_ERROR;
goto done;
}
wrong_type:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header is of wrong type"));
ret = GST_FLOW_ERROR;
goto done;
}
no_caps:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("Received a buffer without first receiving caps"));
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
}
buffer_failed:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create buffer from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
push_error:
{
GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d", ret);
goto done;
}
caps_failed:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create caps from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
event_failed:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create event from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
} }
static GstStateChangeReturn static GstStateChangeReturn
@ -354,7 +438,7 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_READY_TO_NULL: case GST_STATE_CHANGE_PAUSED_TO_READY:
if (this->caps) { if (this->caps) {
gst_caps_unref (this->caps); gst_caps_unref (this->caps);
this->caps = NULL; this->caps = NULL;
@ -363,7 +447,6 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
default: default:
break; break;
} }
return ret; return ret;
} }

View file

@ -85,8 +85,12 @@ enum
GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement, GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
GST_TYPE_ELEMENT, _do_init); GST_TYPE_ELEMENT, _do_init);
static void gst_gdp_pay_reset (GstGDPPay * this);
static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_gdp_pay_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
static GstStateChangeReturn gst_gdp_pay_change_state (GstElement * static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
element, GstStateChange transition); element, GstStateChange transition);
@ -95,7 +99,7 @@ static void gst_gdp_pay_set_property (GObject * object, guint prop_id,
static void gst_gdp_pay_get_property (GObject * object, guint prop_id, static void gst_gdp_pay_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec); GValue * value, GParamSpec * pspec);
static void gst_gdp_pay_dispose (GObject * gobject); static void gst_gdp_pay_finalize (GObject * gobject);
static void static void
gst_gdp_pay_base_init (gpointer g_class) gst_gdp_pay_base_init (gpointer g_class)
@ -121,8 +125,7 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_gdp_pay_dispose); gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_gdp_pay_finalize);
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
g_object_class_install_property (gobject_class, PROP_CRC_HEADER, g_object_class_install_property (gobject_class, PROP_CRC_HEADER,
g_param_spec_boolean ("crc-header", "CRC Header", g_param_spec_boolean ("crc-header", "CRC Header",
@ -136,6 +139,8 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
g_param_spec_enum ("version", "Version", g_param_spec_enum ("version", "Version",
"Version of the GStreamer Data Protocol", "Version of the GStreamer Data Protocol",
GST_TYPE_DP_VERSION, DEFAULT_VERSION, G_PARAM_READWRITE)); GST_TYPE_DP_VERSION, DEFAULT_VERSION, G_PARAM_READWRITE));
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
} }
static void static void
@ -151,30 +156,61 @@ gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
gdppay->srcpad = gdppay->srcpad =
gst_pad_new_from_static_template (&gdp_pay_src_template, "src"); gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
gst_pad_set_event_function (gdppay->srcpad,
GST_DEBUG_FUNCPTR (gst_gdp_pay_src_event));
gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad); gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
gdppay->offset = 0;
gdppay->crc_header = DEFAULT_CRC_HEADER; gdppay->crc_header = DEFAULT_CRC_HEADER;
gdppay->crc_payload = DEFAULT_CRC_PAYLOAD; gdppay->crc_payload = DEFAULT_CRC_PAYLOAD;
gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload; gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload;
gdppay->version = DEFAULT_VERSION; gdppay->version = DEFAULT_VERSION;
gdppay->offset = 0;
gdppay->packetizer = gst_dp_packetizer_new (gdppay->version);
} }
static void static void
gst_gdp_pay_dispose (GObject * gobject) gst_gdp_pay_finalize (GObject * gobject)
{ {
GstGDPPay *this = GST_GDP_PAY (gobject); GstGDPPay *this = GST_GDP_PAY (gobject);
gst_gdp_pay_reset (this);
gst_dp_packetizer_free (this->packetizer);
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
}
static void
gst_gdp_pay_reset (GstGDPPay * this)
{
/* clear the queued buffers */
while (this->queue) {
GstBuffer *buffer;
buffer = GST_BUFFER_CAST (this->queue->data);
GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
/* delete buffer from queue now */
this->queue = g_list_delete_link (this->queue, this->queue);
}
if (this->caps) {
gst_caps_unref (this->caps);
this->caps = NULL;
}
if (this->caps_buf) { if (this->caps_buf) {
gst_buffer_unref (this->caps_buf); gst_buffer_unref (this->caps_buf);
this->caps_buf = NULL; this->caps_buf = NULL;
} }
if (this->tag_buf) {
gst_buffer_unref (this->tag_buf);
this->tag_buf = NULL;
}
if (this->new_segment_buf) { if (this->new_segment_buf) {
gst_buffer_unref (this->new_segment_buf); gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = NULL; this->new_segment_buf = NULL;
} }
GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (gobject)); this->sent_streamheader = FALSE;
this->offset = 0;
} }
/* set OFFSET and OFFSET_END with running count */ /* set OFFSET and OFFSET_END with running count */
@ -195,10 +231,8 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
guint len; guint len;
if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len, if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len,
&header, &payload)) { &header, &payload))
GST_WARNING_OBJECT (this, "could not create GDP header from caps"); goto packet_failed;
return NULL;
}
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps"); GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
headerbuf = gst_buffer_new (); headerbuf = gst_buffer_new ();
@ -211,6 +245,13 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload; GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf); return gst_buffer_join (headerbuf, payloadbuf);
/* ERRORS */
packet_failed:
{
GST_WARNING_OBJECT (this, "could not create GDP header from caps");
return NULL;
}
} }
static GstBuffer * static GstBuffer *
@ -221,10 +262,8 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
guint len; guint len;
if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len, if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len,
&header)) { &header))
GST_WARNING_OBJECT (this, "could not create GDP header from buffer"); goto no_buffer;
return NULL;
}
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer"); GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
headerbuf = gst_buffer_new (); headerbuf = gst_buffer_new ();
@ -233,7 +272,15 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
/* we do not want to lose the ref on the incoming buffer */ /* we do not want to lose the ref on the incoming buffer */
gst_buffer_ref (buffer); gst_buffer_ref (buffer);
return gst_buffer_join (headerbuf, buffer); return gst_buffer_join (headerbuf, buffer);
/* ERRORS */
no_buffer:
{
GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
return NULL;
}
} }
static GstBuffer * static GstBuffer *
@ -248,12 +295,8 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
ret = ret =
this->packetizer->packet_from_event (event, this->header_flag, &len, this->packetizer->packet_from_event (event, this->header_flag, &len,
&header, &payload); &header, &payload);
if (!ret)
if (!ret) { goto no_event;
GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
gst_event_type_get_name (event->type), event->type);
return NULL;
}
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event"); GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
headerbuf = gst_buffer_new (); headerbuf = gst_buffer_new ();
@ -266,6 +309,14 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload; GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf); return gst_buffer_join (headerbuf, payloadbuf);
/* ERRORS */
no_event:
{
GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
gst_event_type_get_name (event->type), event->type);
return NULL;
}
} }
@ -341,15 +392,14 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
bufval = &g_array_index (buffers, GValue, i); bufval = &g_array_index (buffers, GValue, i);
buffer = g_value_peek_pointer (bufval); buffer = g_value_peek_pointer (bufval);
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer); outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
if (outbuffer) { if (!outbuffer)
goto no_buffer;
g_value_init (&value, GST_TYPE_BUFFER); g_value_init (&value, GST_TYPE_BUFFER);
gst_value_set_buffer (&value, outbuffer); gst_value_set_buffer (&value, outbuffer);
gst_value_array_append_value (&array, &value); gst_value_array_append_value (&array, &value);
g_value_unset (&value); g_value_unset (&value);
} }
/* FIXME: if one or more in this loop fail to produce an outbuffer,
* should we error out ? Once ? Every time ? */
}
} }
caps = gst_caps_from_string ("application/x-gdp"); caps = gst_caps_from_string ("application/x-gdp");
@ -406,38 +456,50 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
this->sent_streamheader = TRUE; this->sent_streamheader = TRUE;
GST_DEBUG_OBJECT (this, "need to push %d queued buffers", GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
g_list_length (this->queue)); g_list_length (this->queue));
if (this->queue) { while (this->queue) {
GList *l; GstBuffer *buffer;
for (l = this->queue; l; l = g_list_next (l)) { buffer = GST_BUFFER_CAST (this->queue->data);
GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", l->data); GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
gst_buffer_set_caps (l->data, caps);
r = gst_pad_push (this->srcpad, l->data); /* delete buffer from queue now */
this->queue = g_list_delete_link (this->queue, this->queue);
/* set caps en push */
gst_buffer_set_caps (buffer, caps);
r = gst_pad_push (this->srcpad, buffer);
if (r != GST_FLOW_OK) { if (r != GST_FLOW_OK) {
GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r); GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
return r; return r;
} }
} }
}
return r; return r;
/* ERRORS */
no_buffer:
{
GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
("failed to create GDP buffer from streamheader"));
return GST_FLOW_ERROR;
}
} }
/* queue a buffer internally if we haven't sent streamheader buffers yet; /* queue a buffer internally if we haven't sent streamheader buffers yet;
* otherwise, just push on */ * otherwise, just push on, this takes ownership of the buffer. */
static GstFlowReturn static GstFlowReturn
gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
{ {
if (this->sent_streamheader) { if (this->sent_streamheader) {
GST_LOG_OBJECT (this, "Pushing GDP buffer %p", buffer); GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT,
GST_LOG_OBJECT (this, "set caps %" GST_PTR_FORMAT, this->caps); buffer, this->caps);
return gst_pad_push (this->srcpad, buffer); return gst_pad_push (this->srcpad, buffer);
} }
/* store it on an internal queue */ /* store it on an internal queue. buffer remains reffed. */
this->queue = g_list_append (this->queue, buffer); this->queue = g_list_append (this->queue, buffer);
GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued", GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued",
buffer, g_list_length (this->queue)); buffer, g_list_length (this->queue));
return GST_FLOW_OK; return GST_FLOW_OK;
} }
@ -466,12 +528,7 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
if (!outbuffer) { if (!outbuffer) {
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL), GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from new segment event")); ("Could not create GDP buffer from new segment event"));
/*
ret = GST_FLOW_ERROR;
goto done;
*/
} else { } else {
gst_gdp_stamp_buffer (this, outbuffer); gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer); GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
GST_BUFFER_DURATION (outbuffer) = 0; GST_BUFFER_DURATION (outbuffer) = 0;
@ -483,26 +540,16 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* make sure we've received caps before */ /* make sure we've received caps before */
caps = gst_buffer_get_caps (buffer); caps = gst_buffer_get_caps (buffer);
if (!this->caps && !caps) { if (!this->caps && !caps)
GST_WARNING_OBJECT (this, "first received buffer does not have caps set"); goto no_caps;
if (caps)
gst_caps_unref (caps);
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
}
/* if the caps have changed, process caps first */ /* if the caps have changed, process caps first */
if (caps && !gst_caps_is_equal (this->caps, caps)) { if (caps && !gst_caps_is_equal (this->caps, caps)) {
GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps); GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
gst_caps_replace (&(this->caps), caps); gst_caps_replace (&(this->caps), caps);
outbuffer = gst_gdp_buffer_from_caps (this, caps); outbuffer = gst_gdp_buffer_from_caps (this, caps);
if (!outbuffer) { if (!outbuffer)
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL), goto no_caps_buffer;
("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
gst_caps_unref (caps);
ret = GST_FLOW_ERROR;
goto done;
}
gst_gdp_stamp_buffer (this, outbuffer); gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer); GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@ -515,12 +562,8 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* create a GDP header packet, /* create a GDP header packet,
* then create a GST buffer of the header packet and the buffer contents */ * then create a GST buffer of the header packet and the buffer contents */
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer); outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
if (!outbuffer) { if (!outbuffer)
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL), goto no_buffer;
("Could not create GDP buffer from buffer"));
ret = GST_FLOW_ERROR;
goto done;
}
gst_gdp_stamp_buffer (this, outbuffer); gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer); GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@ -532,6 +575,34 @@ done:
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
gst_object_unref (this); gst_object_unref (this);
return ret; return ret;
/* ERRORS */
no_caps:
{
/* when returning a fatal error as a GstFlowReturn we must post an error
* message */
GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
("first received buffer does not have caps set"));
if (caps)
gst_caps_unref (caps);
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
}
no_caps_buffer:
{
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
gst_caps_unref (caps);
ret = GST_FLOW_ERROR;
goto done;
}
no_buffer:
{
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from buffer"));
ret = GST_FLOW_ERROR;
goto done;
}
} }
static gboolean static gboolean
@ -547,13 +618,9 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
/* now turn the event into a buffer */ /* now turn the event into a buffer */
outbuffer = gst_gdp_buffer_from_event (this, event); outbuffer = gst_gdp_buffer_from_event (this, event);
if (!outbuffer) { if (!outbuffer)
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL), goto no_outbuffer;
("Could not create GDP buffer from received event (type %s)",
gst_event_type_get_name (event->type)));
ret = FALSE;
goto done;
}
gst_gdp_stamp_buffer (this, outbuffer); gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event); GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
GST_BUFFER_DURATION (outbuffer) = 0; GST_BUFFER_DURATION (outbuffer) = 0;
@ -562,47 +629,87 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
* and not send it on */ * and not send it on */
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
GST_DEBUG_OBJECT (this, "received new_segment event");
if (this->new_segment_buf) {
gst_buffer_unref (this->new_segment_buf);
}
GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf", GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf",
outbuffer); outbuffer);
if (this->new_segment_buf)
gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = outbuffer; this->new_segment_buf = outbuffer;
gst_gdp_pay_reset_streamheader (this); gst_gdp_pay_reset_streamheader (this);
break; break;
case GST_EVENT_TAG: case GST_EVENT_TAG:
GST_DEBUG_OBJECT (this, "received tag event");
if (this->tag_buf) {
gst_buffer_unref (this->tag_buf);
}
GST_DEBUG_OBJECT (this, "Storing buffer %p as tag_buf", outbuffer); GST_DEBUG_OBJECT (this, "Storing buffer %p as tag_buf", outbuffer);
if (this->tag_buf)
gst_buffer_unref (this->tag_buf);
this->tag_buf = outbuffer; this->tag_buf = outbuffer;
gst_gdp_pay_reset_streamheader (this); gst_gdp_pay_reset_streamheader (this);
break; break;
default: default:
GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer,
event); event);
flowret = gst_gdp_queue_buffer (this, outbuffer); flowret = gst_gdp_queue_buffer (this, outbuffer);
if (flowret != GST_FLOW_OK) { if (flowret != GST_FLOW_OK)
GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", goto push_error;
flowret);
ret = FALSE;
goto done;
}
break; break;
} }
/* if we have EOS, we should send on EOS ourselves */ /* if we have EOS, we should send on EOS ourselves */
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event); GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
return gst_pad_push_event (this->srcpad, event); /* ref, we unref later again */
}; ret = gst_pad_push_event (this->srcpad, gst_event_ref (event));
}
done: done:
gst_object_unref (this);
gst_event_unref (event); gst_event_unref (event);
gst_object_unref (this);
return ret; return ret;
/* ERRORS */
no_outbuffer:
{
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from received event (type %s)",
gst_event_type_get_name (event->type)));
ret = FALSE;
goto done;
}
push_error:
{
GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", flowret);
ret = FALSE;
goto done;
}
}
static gboolean
gst_gdp_pay_src_event (GstPad * pad, GstEvent * event)
{
GstGDPPay *this;
gboolean res = TRUE;
this = GST_GDP_PAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* we refuse seek for now. */
gst_event_unref (event);
res = FALSE;
break;
case GST_EVENT_QOS:
case GST_EVENT_NAVIGATION:
default:
/* everything else is passed */
res = gst_pad_push_event (this->sinkpad, event);
break;
}
gst_object_unref (this);
return res;
} }
static void static void
@ -667,7 +774,6 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
this->packetizer = gst_dp_packetizer_new (this->version);
break; break;
default: default:
break; break;
@ -677,16 +783,7 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_PAUSED_TO_READY:
if (this->packetizer) { gst_gdp_pay_reset (this);
gst_dp_packetizer_free (this->packetizer);
this->packetizer = NULL;
}
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (this->caps) {
gst_caps_unref (this->caps);
this->caps = NULL;
}
break; break;
default: default:
break; break;

View file

@ -41,6 +41,7 @@ typedef struct _GstGDPPayClass GstGDPPayClass;
struct _GstGDPPay struct _GstGDPPay
{ {
GstElement element; GstElement element;
GstPad *sinkpad; GstPad *sinkpad;
GstPad *srcpad; GstPad *srcpad;