gst/gdp/gstgdpdepay.c: Disable seeking.

Original commit message from CVS:
* gst/gdp/gstgdpdepay.c: (gst_gdp_depay_init),
(gst_gdp_depay_finalize), (gst_gdp_depay_sink_event),
(gst_gdp_depay_src_event), (gst_gdp_depay_chain),
(gst_gdp_depay_change_state):
Disable seeking.
Small cleanups.
Clear adapter on disconts.
Clear caps when going to READY instead of NULL
* gst/gdp/gstgdppay.c: (gst_gdp_pay_class_init),
(gst_gdp_pay_init), (gst_gdp_pay_finalize), (gst_gdp_pay_reset),
(gst_gdp_buffer_from_caps), (gst_gdp_pay_buffer_from_buffer),
(gst_gdp_buffer_from_event), (gst_gdp_pay_reset_streamheader),
(gst_gdp_queue_buffer), (gst_gdp_pay_chain),
(gst_gdp_pay_sink_event), (gst_gdp_pay_src_event),
(gst_gdp_pay_change_state):
* gst/gdp/gstgdppay.h:
Reset payloader when going to READY.
Fix leaked buffers in ->queue on push errors.
Disable seeking.
Code cleanups.
Create packetizer in _init, free in _finalize.
This commit is contained in:
Wim Taymans 2006-08-02 16:56:19 +00:00
parent c98740d77a
commit 5929ad4ac9
4 changed files with 406 additions and 200 deletions

View file

@ -1,3 +1,28 @@
2006-08-02 Wim Taymans <wim@fluendo.com>
* gst/gdp/gstgdpdepay.c: (gst_gdp_depay_init),
(gst_gdp_depay_finalize), (gst_gdp_depay_sink_event),
(gst_gdp_depay_src_event), (gst_gdp_depay_chain),
(gst_gdp_depay_change_state):
Disable seeking.
Small cleanups.
Clear adapter on disconts.
Clear caps when going to READY instead of NULL
* gst/gdp/gstgdppay.c: (gst_gdp_pay_class_init),
(gst_gdp_pay_init), (gst_gdp_pay_finalize), (gst_gdp_pay_reset),
(gst_gdp_buffer_from_caps), (gst_gdp_pay_buffer_from_buffer),
(gst_gdp_buffer_from_event), (gst_gdp_pay_reset_streamheader),
(gst_gdp_queue_buffer), (gst_gdp_pay_chain),
(gst_gdp_pay_sink_event), (gst_gdp_pay_src_event),
(gst_gdp_pay_change_state):
* gst/gdp/gstgdppay.h:
Reset payloader when going to READY.
Fix leaked buffers in ->queue on push errors.
Disable seeking.
Code cleanups.
Create packetizer in _init, free in _finalize.
2006-07-31 Julien MOUTTE <julien@moutte.net>
* ext/directfb/dfbvideosink.c: (gst_dfbvideosink_change_state),

View file

@ -56,7 +56,6 @@ GST_ELEMENT_DETAILS ("GDP Depayloader",
enum
{
PROP_0,
/* FILL ME */
};
static GstStaticPadTemplate gdp_depay_sink_template =
@ -82,6 +81,8 @@ GST_BOILERPLATE_FULL (GstGDPDepay, gst_gdp_depay, GstElement,
GST_TYPE_ELEMENT, _do_init);
static gboolean gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event);
static gboolean gst_gdp_depay_src_event (GstPad * pad, GstEvent * event);
static GstFlowReturn gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer);
static GstStateChangeReturn gst_gdp_depay_change_state (GstElement *
@ -129,6 +130,8 @@ gst_gdp_depay_init (GstGDPDepay * gdpdepay, GstGDPDepayClass * g_class)
gdpdepay->srcpad =
gst_pad_new_from_static_template (&gdp_depay_src_template, "src");
gst_pad_set_event_function (gdpdepay->srcpad,
GST_DEBUG_FUNCPTR (gst_gdp_depay_src_event));
/* our caps will always be decided by the incoming GDP caps buffers */
gst_pad_use_fixed_caps (gdpdepay->srcpad);
gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->srcpad);
@ -144,19 +147,13 @@ gst_gdp_depay_finalize (GObject * gobject)
this = GST_GDP_DEPAY (gobject);
if (this->caps)
gst_caps_unref (this->caps);
if (this->header)
g_free (this->header);
g_free (this->header);
gst_adapter_clear (this->adapter);
g_object_unref (this->adapter);
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
}
/* we ignore most events because the only events we want to output are those
* found in the gdp data stream.
* An exception is the EOS event, if we get that on the sinkpad, we certainly
* are not going to produce more data.
*/
static gboolean
gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
{
@ -166,10 +163,20 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
case GST_EVENT_FLUSH_START:
case GST_EVENT_FLUSH_STOP:
/* forward flush start and stop */
res = gst_pad_push_event (this->srcpad, event);
break;
case GST_EVENT_EOS:
/* after EOS, we don't expect to output anything anymore */
res = gst_pad_push_event (this->srcpad, event);
break;
case GST_EVENT_NEWSEGMENT:
case GST_EVENT_TAG:
case GST_EVENT_BUFFERSIZE:
default:
/* we unref most events as we take them from the datastream */
gst_event_unref (event);
break;
}
@ -178,6 +185,32 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
return res;
}
static gboolean
gst_gdp_depay_src_event (GstPad * pad, GstEvent * event)
{
GstGDPDepay *this;
gboolean res = TRUE;
this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* we refuse seek for now. */
gst_event_unref (event);
res = FALSE;
break;
case GST_EVENT_QOS:
case GST_EVENT_NAVIGATION:
default:
/* everything else is passed */
res = gst_pad_push_event (this->sinkpad, event);
break;
}
gst_object_unref (this);
return res;
}
static GstFlowReturn
gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
{
@ -186,108 +219,124 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GstCaps *caps;
GstBuffer *buf;
GstEvent *event;
guint8 *header = NULL;
guint8 *payload = NULL;
guint available;
gboolean running = TRUE;
this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
/* On DISCONT, get rid of accumulated data. We assume a buffer after the
* DISCONT contains (part of) a new valid header, if not we error because we
* lost sync */
if (GST_BUFFER_IS_DISCONT (buffer)) {
gst_adapter_clear (this->adapter);
this->state = GST_GDP_DEPAY_STATE_HEADER;
}
gst_adapter_push (this->adapter, buffer);
while (running) {
while (TRUE) {
switch (this->state) {
case GST_GDP_DEPAY_STATE_HEADER:
available = gst_adapter_available (this->adapter);
if (available < GST_DP_HEADER_LENGTH) {
running = FALSE;
break;
}
{
guint8 *header;
/* collect a complete header, validate and store the header. Figure out
* the payload length and switch to the PAYLOAD state */
available = gst_adapter_available (this->adapter);
if (available < GST_DP_HEADER_LENGTH)
goto done;
if (this->header)
g_free (this->header);
GST_LOG_OBJECT (this, "reading GDP header from adapter");
header = gst_adapter_take (this->adapter, GST_DP_HEADER_LENGTH);
if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) {
g_free (header);
goto header_validate_error;
}
/* store types and payload length. Also store the header, which we need
* to make the payload. */
this->payload_length = gst_dp_header_payload_length (header);
this->payload_type = gst_dp_header_payload_type (header);
/* free previous header and store new one. */
g_free (this->header);
this->header = header;
GST_LOG_OBJECT (this,
"read GDP header, payload size %d, switching to state PAYLOAD",
this->payload_length);
this->state = GST_GDP_DEPAY_STATE_PAYLOAD;
break;
}
case GST_GDP_DEPAY_STATE_PAYLOAD:
{
/* in this state we wait for all the payload data to be available in the
* adapter. Then we switch to the state where we actually process the
* payload. */
available = gst_adapter_available (this->adapter);
if (available < this->payload_length) {
running = FALSE;
break;
}
if (available < this->payload_length)
goto done;
/* change state based on type */
if (this->payload_type == GST_DP_PAYLOAD_BUFFER) {
GST_LOG_OBJECT (this, "switching to state BUFFER");
this->state = GST_GDP_DEPAY_STATE_BUFFER;
} else if (this->payload_type == GST_DP_PAYLOAD_CAPS) {
GST_LOG_OBJECT (this, "switching to state CAPS");
this->state = GST_GDP_DEPAY_STATE_CAPS;
} else if (this->payload_type >= GST_DP_PAYLOAD_EVENT_NONE) {
GST_LOG_OBJECT (this, "switching to state EVENT");
this->state = GST_GDP_DEPAY_STATE_EVENT;
} else
goto wrong_type;
break;
case GST_GDP_DEPAY_STATE_BUFFER:
if (!this->caps) {
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("Received a buffer without first receiving caps"));
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
switch (this->payload_type) {
case GST_DP_PAYLOAD_BUFFER:
GST_LOG_OBJECT (this, "switching to state BUFFER");
this->state = GST_GDP_DEPAY_STATE_BUFFER;
break;
case GST_DP_PAYLOAD_CAPS:
GST_LOG_OBJECT (this, "switching to state CAPS");
this->state = GST_GDP_DEPAY_STATE_CAPS;
break;
case GST_DP_PAYLOAD_EVENT_NONE:
GST_LOG_OBJECT (this, "switching to state EVENT");
this->state = GST_GDP_DEPAY_STATE_EVENT;
break;
default:
goto wrong_type;
}
break;
}
case GST_GDP_DEPAY_STATE_BUFFER:
{
/* if we receive a buffer without caps first, we error out */
if (!this->caps)
goto no_caps;
GST_LOG_OBJECT (this, "reading GDP buffer from adapter");
buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, this->header);
if (!buf) {
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create buffer from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
if (!buf)
goto buffer_failed;
/* now take the payload if there is any */
if (this->payload_length > 0) {
guint8 *payload;
payload = gst_adapter_take (this->adapter, this->payload_length);
memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
g_free (payload);
}
payload = gst_adapter_take (this->adapter, this->payload_length);
memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
g_free (payload);
payload = NULL;
/* set caps and push */
gst_buffer_set_caps (buf, this->caps);
ret = gst_pad_push (this->srcpad, buf);
if (ret != GST_FLOW_OK) {
GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d",
ret);
goto done;
}
if (ret != GST_FLOW_OK)
goto push_error;
GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER;
break;
}
case GST_GDP_DEPAY_STATE_CAPS:
{
guint8 *payload;
/* take the payload of the caps */
GST_LOG_OBJECT (this, "reading GDP caps from adapter");
payload = gst_adapter_take (this->adapter, this->payload_length);
caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, this->header,
payload);
g_free (payload);
payload = NULL;
if (!caps) {
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create caps from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
if (!caps)
goto caps_failed;
GST_DEBUG_OBJECT (this, "read caps %" GST_PTR_FORMAT, caps);
gst_caps_replace (&(this->caps), caps);
gst_pad_set_caps (this->srcpad, caps);
@ -297,24 +346,24 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER;
break;
}
case GST_GDP_DEPAY_STATE_EVENT:
{
guint8 *payload;
GST_LOG_OBJECT (this, "reading GDP event from adapter");
/* adapter doesn't like 0 length payload */
if (this->payload_length > 0)
payload = gst_adapter_take (this->adapter, this->payload_length);
else
payload = NULL;
event = gst_dp_event_from_packet (GST_DP_HEADER_LENGTH, this->header,
payload);
if (payload) {
g_free (payload);
payload = NULL;
}
if (!event) {
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create event from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
g_free (payload);
if (!event)
goto event_failed;
GST_DEBUG_OBJECT (this, "sending deserialized event %p of type %s",
event, gst_event_type_get_name (event->type));
gst_pad_push_event (this->srcpad, event);
@ -322,27 +371,62 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER;
break;
}
}
}
goto done;
header_validate_error:
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header does not validate"));
g_free (header);
ret = GST_FLOW_ERROR;
goto done;
wrong_type:
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header is of wrong type"));
g_free (header);
ret = GST_FLOW_ERROR;
goto done;
done:
gst_object_unref (this);
return ret;
/* ERRORS */
header_validate_error:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header does not validate"));
ret = GST_FLOW_ERROR;
goto done;
}
wrong_type:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("GDP packet header is of wrong type"));
ret = GST_FLOW_ERROR;
goto done;
}
no_caps:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("Received a buffer without first receiving caps"));
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
}
buffer_failed:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create buffer from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
push_error:
{
GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d", ret);
goto done;
}
caps_failed:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create caps from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
event_failed:
{
GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
("could not create event from GDP packet"));
ret = GST_FLOW_ERROR;
goto done;
}
}
static GstStateChangeReturn
@ -354,7 +438,7 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_NULL:
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (this->caps) {
gst_caps_unref (this->caps);
this->caps = NULL;
@ -363,7 +447,6 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
default:
break;
}
return ret;
}

View file

@ -85,8 +85,12 @@ enum
GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
GST_TYPE_ELEMENT, _do_init);
static void gst_gdp_pay_reset (GstGDPPay * this);
static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_gdp_pay_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
element, GstStateChange transition);
@ -95,7 +99,7 @@ static void gst_gdp_pay_set_property (GObject * object, guint prop_id,
static void gst_gdp_pay_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_gdp_pay_dispose (GObject * gobject);
static void gst_gdp_pay_finalize (GObject * gobject);
static void
gst_gdp_pay_base_init (gpointer g_class)
@ -121,8 +125,7 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_gdp_pay_dispose);
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_gdp_pay_finalize);
g_object_class_install_property (gobject_class, PROP_CRC_HEADER,
g_param_spec_boolean ("crc-header", "CRC Header",
@ -136,6 +139,8 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
g_param_spec_enum ("version", "Version",
"Version of the GStreamer Data Protocol",
GST_TYPE_DP_VERSION, DEFAULT_VERSION, G_PARAM_READWRITE));
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
}
static void
@ -151,30 +156,61 @@ gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
gdppay->srcpad =
gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
gst_pad_set_event_function (gdppay->srcpad,
GST_DEBUG_FUNCPTR (gst_gdp_pay_src_event));
gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
gdppay->offset = 0;
gdppay->crc_header = DEFAULT_CRC_HEADER;
gdppay->crc_payload = DEFAULT_CRC_PAYLOAD;
gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload;
gdppay->version = DEFAULT_VERSION;
gdppay->offset = 0;
gdppay->packetizer = gst_dp_packetizer_new (gdppay->version);
}
static void
gst_gdp_pay_dispose (GObject * gobject)
gst_gdp_pay_finalize (GObject * gobject)
{
GstGDPPay *this = GST_GDP_PAY (gobject);
gst_gdp_pay_reset (this);
gst_dp_packetizer_free (this->packetizer);
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
}
static void
gst_gdp_pay_reset (GstGDPPay * this)
{
/* clear the queued buffers */
while (this->queue) {
GstBuffer *buffer;
buffer = GST_BUFFER_CAST (this->queue->data);
GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
/* delete buffer from queue now */
this->queue = g_list_delete_link (this->queue, this->queue);
}
if (this->caps) {
gst_caps_unref (this->caps);
this->caps = NULL;
}
if (this->caps_buf) {
gst_buffer_unref (this->caps_buf);
this->caps_buf = NULL;
}
if (this->tag_buf) {
gst_buffer_unref (this->tag_buf);
this->tag_buf = NULL;
}
if (this->new_segment_buf) {
gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = NULL;
}
GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (gobject));
this->sent_streamheader = FALSE;
this->offset = 0;
}
/* set OFFSET and OFFSET_END with running count */
@ -195,10 +231,8 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
guint len;
if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len,
&header, &payload)) {
GST_WARNING_OBJECT (this, "could not create GDP header from caps");
return NULL;
}
&header, &payload))
goto packet_failed;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
headerbuf = gst_buffer_new ();
@ -211,6 +245,13 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf);
/* ERRORS */
packet_failed:
{
GST_WARNING_OBJECT (this, "could not create GDP header from caps");
return NULL;
}
}
static GstBuffer *
@ -221,10 +262,8 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
guint len;
if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len,
&header)) {
GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
return NULL;
}
&header))
goto no_buffer;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
headerbuf = gst_buffer_new ();
@ -233,7 +272,15 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
/* we do not want to lose the ref on the incoming buffer */
gst_buffer_ref (buffer);
return gst_buffer_join (headerbuf, buffer);
/* ERRORS */
no_buffer:
{
GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
return NULL;
}
}
static GstBuffer *
@ -248,12 +295,8 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
ret =
this->packetizer->packet_from_event (event, this->header_flag, &len,
&header, &payload);
if (!ret) {
GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
gst_event_type_get_name (event->type), event->type);
return NULL;
}
if (!ret)
goto no_event;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
headerbuf = gst_buffer_new ();
@ -266,6 +309,14 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf);
/* ERRORS */
no_event:
{
GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
gst_event_type_get_name (event->type), event->type);
return NULL;
}
}
@ -341,14 +392,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
bufval = &g_array_index (buffers, GValue, i);
buffer = g_value_peek_pointer (bufval);
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
if (outbuffer) {
g_value_init (&value, GST_TYPE_BUFFER);
gst_value_set_buffer (&value, outbuffer);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
}
/* FIXME: if one or more in this loop fail to produce an outbuffer,
* should we error out ? Once ? Every time ? */
if (!outbuffer)
goto no_buffer;
g_value_init (&value, GST_TYPE_BUFFER);
gst_value_set_buffer (&value, outbuffer);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
}
}
@ -406,38 +456,50 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
this->sent_streamheader = TRUE;
GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
g_list_length (this->queue));
if (this->queue) {
GList *l;
while (this->queue) {
GstBuffer *buffer;
for (l = this->queue; l; l = g_list_next (l)) {
GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", l->data);
gst_buffer_set_caps (l->data, caps);
r = gst_pad_push (this->srcpad, l->data);
if (r != GST_FLOW_OK) {
GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
return r;
}
buffer = GST_BUFFER_CAST (this->queue->data);
GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
/* delete buffer from queue now */
this->queue = g_list_delete_link (this->queue, this->queue);
/* set caps en push */
gst_buffer_set_caps (buffer, caps);
r = gst_pad_push (this->srcpad, buffer);
if (r != GST_FLOW_OK) {
GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
return r;
}
}
return r;
/* ERRORS */
no_buffer:
{
GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
("failed to create GDP buffer from streamheader"));
return GST_FLOW_ERROR;
}
}
/* queue a buffer internally if we haven't sent streamheader buffers yet;
* otherwise, just push on */
* otherwise, just push on, this takes ownership of the buffer. */
static GstFlowReturn
gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
{
if (this->sent_streamheader) {
GST_LOG_OBJECT (this, "Pushing GDP buffer %p", buffer);
GST_LOG_OBJECT (this, "set caps %" GST_PTR_FORMAT, this->caps);
GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT,
buffer, this->caps);
return gst_pad_push (this->srcpad, buffer);
}
/* store it on an internal queue */
/* store it on an internal queue. buffer remains reffed. */
this->queue = g_list_append (this->queue, buffer);
GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued",
buffer, g_list_length (this->queue));
return GST_FLOW_OK;
}
@ -466,12 +528,7 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
if (!outbuffer) {
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from new segment event"));
/*
ret = GST_FLOW_ERROR;
goto done;
*/
} else {
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
GST_BUFFER_DURATION (outbuffer) = 0;
@ -483,26 +540,16 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* make sure we've received caps before */
caps = gst_buffer_get_caps (buffer);
if (!this->caps && !caps) {
GST_WARNING_OBJECT (this, "first received buffer does not have caps set");
if (caps)
gst_caps_unref (caps);
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
}
if (!this->caps && !caps)
goto no_caps;
/* if the caps have changed, process caps first */
if (caps && !gst_caps_is_equal (this->caps, caps)) {
GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
gst_caps_replace (&(this->caps), caps);
outbuffer = gst_gdp_buffer_from_caps (this, caps);
if (!outbuffer) {
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
gst_caps_unref (caps);
ret = GST_FLOW_ERROR;
goto done;
}
if (!outbuffer)
goto no_caps_buffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@ -515,12 +562,8 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* create a GDP header packet,
* then create a GST buffer of the header packet and the buffer contents */
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
if (!outbuffer) {
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from buffer"));
ret = GST_FLOW_ERROR;
goto done;
}
if (!outbuffer)
goto no_buffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@ -532,6 +575,34 @@ done:
gst_buffer_unref (buffer);
gst_object_unref (this);
return ret;
/* ERRORS */
no_caps:
{
/* when returning a fatal error as a GstFlowReturn we must post an error
* message */
GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
("first received buffer does not have caps set"));
if (caps)
gst_caps_unref (caps);
ret = GST_FLOW_NOT_NEGOTIATED;
goto done;
}
no_caps_buffer:
{
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
gst_caps_unref (caps);
ret = GST_FLOW_ERROR;
goto done;
}
no_buffer:
{
GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from buffer"));
ret = GST_FLOW_ERROR;
goto done;
}
}
static gboolean
@ -547,13 +618,9 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
/* now turn the event into a buffer */
outbuffer = gst_gdp_buffer_from_event (this, event);
if (!outbuffer) {
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from received event (type %s)",
gst_event_type_get_name (event->type)));
ret = FALSE;
goto done;
}
if (!outbuffer)
goto no_outbuffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
GST_BUFFER_DURATION (outbuffer) = 0;
@ -562,47 +629,87 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
* and not send it on */
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT:
GST_DEBUG_OBJECT (this, "received new_segment event");
if (this->new_segment_buf) {
gst_buffer_unref (this->new_segment_buf);
}
GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf",
outbuffer);
if (this->new_segment_buf)
gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = outbuffer;
gst_gdp_pay_reset_streamheader (this);
break;
case GST_EVENT_TAG:
GST_DEBUG_OBJECT (this, "received tag event");
if (this->tag_buf) {
gst_buffer_unref (this->tag_buf);
}
GST_DEBUG_OBJECT (this, "Storing buffer %p as tag_buf", outbuffer);
if (this->tag_buf)
gst_buffer_unref (this->tag_buf);
this->tag_buf = outbuffer;
gst_gdp_pay_reset_streamheader (this);
break;
default:
GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer,
event);
flowret = gst_gdp_queue_buffer (this, outbuffer);
if (flowret != GST_FLOW_OK) {
GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d",
flowret);
ret = FALSE;
goto done;
}
if (flowret != GST_FLOW_OK)
goto push_error;
break;
}
/* if we have EOS, we should send on EOS ourselves */
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
return gst_pad_push_event (this->srcpad, event);
};
/* ref, we unref later again */
ret = gst_pad_push_event (this->srcpad, gst_event_ref (event));
}
done:
gst_object_unref (this);
gst_event_unref (event);
gst_object_unref (this);
return ret;
/* ERRORS */
no_outbuffer:
{
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from received event (type %s)",
gst_event_type_get_name (event->type)));
ret = FALSE;
goto done;
}
push_error:
{
GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", flowret);
ret = FALSE;
goto done;
}
}
static gboolean
gst_gdp_pay_src_event (GstPad * pad, GstEvent * event)
{
GstGDPPay *this;
gboolean res = TRUE;
this = GST_GDP_PAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* we refuse seek for now. */
gst_event_unref (event);
res = FALSE;
break;
case GST_EVENT_QOS:
case GST_EVENT_NAVIGATION:
default:
/* everything else is passed */
res = gst_pad_push_event (this->sinkpad, event);
break;
}
gst_object_unref (this);
return res;
}
static void
@ -667,7 +774,6 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
this->packetizer = gst_dp_packetizer_new (this->version);
break;
default:
break;
@ -677,16 +783,7 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
if (this->packetizer) {
gst_dp_packetizer_free (this->packetizer);
this->packetizer = NULL;
}
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (this->caps) {
gst_caps_unref (this->caps);
this->caps = NULL;
}
gst_gdp_pay_reset (this);
break;
default:
break;

View file

@ -41,6 +41,7 @@ typedef struct _GstGDPPayClass GstGDPPayClass;
struct _GstGDPPay
{
GstElement element;
GstPad *sinkpad;
GstPad *srcpad;