gst/rawparse/gstrawparse.*: Implement pull mode.

Original commit message from CVS:
* gst/rawparse/gstrawparse.c: (gst_raw_parse_class_init),
(gst_raw_parse_init), (gst_raw_parse_reset),
(gst_raw_parse_set_src_caps), (gst_raw_parse_push_buffer),
(gst_raw_parse_chain), (gst_raw_parse_loop),
(gst_raw_parse_sink_activate), (gst_raw_parse_sink_activatepull),
(gst_raw_parse_change_state), (gst_raw_parse_sink_event),
(gst_raw_parse_handle_seek_push), (gst_raw_parse_handle_seek_pull),
(gst_raw_parse_src_event), (gst_raw_parse_src_query):
* gst/rawparse/gstrawparse.h:
Implement pull mode.
This commit is contained in:
Sebastian Dröge 2008-01-18 16:56:19 +00:00
parent 63eb14e5dd
commit 19557fc2e6
4 changed files with 416 additions and 84 deletions

View file

@ -1,3 +1,16 @@
2008-01-18 Sebastian Dröge <slomo@circular-chaos.org>
* gst/rawparse/gstrawparse.c: (gst_raw_parse_class_init),
(gst_raw_parse_init), (gst_raw_parse_reset),
(gst_raw_parse_set_src_caps), (gst_raw_parse_push_buffer),
(gst_raw_parse_chain), (gst_raw_parse_loop),
(gst_raw_parse_sink_activate), (gst_raw_parse_sink_activatepull),
(gst_raw_parse_change_state), (gst_raw_parse_sink_event),
(gst_raw_parse_handle_seek_push), (gst_raw_parse_handle_seek_pull),
(gst_raw_parse_src_event), (gst_raw_parse_src_query):
* gst/rawparse/gstrawparse.h:
Implement pull mode.
2008-01-18 Sebastian Dröge <slomo@circular-chaos.org>
* gst/multifile/gstmultifilesrc.c: (gst_multi_file_src_create):

2
common

@ -1 +1 @@
Subproject commit a78a9496c0c8cd815dda3d8d6127f0a654665abf
Subproject commit b6bd1a35b641237d016496039e474dee4230de76

View file

@ -35,6 +35,12 @@
static void gst_raw_parse_dispose (GObject * object);
static gboolean gst_raw_parse_sink_activate (GstPad * sinkpad);
static gboolean gst_raw_parse_sink_activatepull (GstPad * sinkpad,
gboolean active);
static void gst_raw_parse_loop (GstElement * element);
static GstStateChangeReturn gst_raw_parse_change_state (GstElement * element,
GstStateChange transition);
static GstFlowReturn gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer);
static gboolean gst_raw_parse_sink_event (GstPad * pad, GstEvent * event);
static gboolean gst_raw_parse_src_event (GstPad * pad, GstEvent * event);
@ -44,6 +50,8 @@ static gboolean gst_raw_parse_convert (GstRawParse * rp,
GstFormat src_format, gint64 src_value,
GstFormat dest_format, gint64 * dest_value);
static void gst_raw_parse_reset (GstRawParse * rp);
static GstStaticPadTemplate gst_raw_parse_sink_pad_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
@ -71,8 +79,12 @@ static void
gst_raw_parse_class_init (GstRawParseClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
gobject_class->dispose = gst_raw_parse_dispose;
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_raw_parse_change_state);
}
static void
@ -86,8 +98,14 @@ gst_raw_parse_init (GstRawParse * rp, GstRawParseClass * g_class)
"sink");
gst_element_add_pad (GST_ELEMENT (rp), rp->sinkpad);
gst_pad_set_chain_function (rp->sinkpad, gst_raw_parse_chain);
gst_pad_set_event_function (rp->sinkpad, gst_raw_parse_sink_event);
gst_pad_set_chain_function (rp->sinkpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_chain));
gst_pad_set_event_function (rp->sinkpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_sink_event));
gst_pad_set_activate_function (rp->sinkpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_sink_activate));
gst_pad_set_activatepull_function (rp->sinkpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_sink_activatepull));
src_pad_template = gst_element_class_get_pad_template (element_class, "src");
@ -100,16 +118,21 @@ gst_raw_parse_init (GstRawParse * rp, GstRawParseClass * g_class)
gst_element_add_pad (GST_ELEMENT (rp), rp->srcpad);
gst_pad_set_event_function (rp->srcpad, gst_raw_parse_src_event);
gst_pad_set_event_function (rp->srcpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_src_event));
gst_pad_set_query_type_function (rp->srcpad, gst_raw_parse_src_query_type);
gst_pad_set_query_function (rp->srcpad, gst_raw_parse_src_query);
gst_pad_set_query_type_function (rp->srcpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_src_query_type));
gst_pad_set_query_function (rp->srcpad,
GST_DEBUG_FUNCPTR (gst_raw_parse_src_query));
rp->adapter = gst_adapter_new ();
rp->fps_n = 1;
rp->fps_d = 0;
rp->framesize = 1;
gst_raw_parse_reset (rp);
}
static void
@ -153,19 +176,81 @@ static void
gst_raw_parse_reset (GstRawParse * rp)
{
rp->n_frames = 0;
rp->offset = 0;
rp->discont = TRUE;
rp->upstream_length = 0;
gst_segment_init (&rp->segment, GST_FORMAT_TIME);
rp->need_newsegment = TRUE;
gst_adapter_clear (rp->adapter);
}
static gboolean
gst_raw_parse_set_src_caps (GstRawParse * rp)
{
GstRawParseClass *rp_class = GST_RAW_PARSE_GET_CLASS (rp);
GstCaps *caps;
if (rp->negotiated)
return TRUE;
if (rp_class->get_caps) {
caps = rp_class->get_caps (rp);
} else {
GST_WARNING
("Subclass doesn't implement get_caps() method, using ANY caps");
caps = gst_caps_new_any ();
}
rp->negotiated = gst_pad_set_caps (rp->srcpad, caps);
return rp->negotiated;
}
static GstFlowReturn
gst_raw_parse_push_buffer (GstRawParse * rp, GstBuffer * buffer)
{
GstFlowReturn ret;
gint nframes;
nframes = GST_BUFFER_SIZE (buffer) / rp->framesize;
if (rp->fps_n) {
GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start +
gst_util_uint64_scale (rp->n_frames, GST_SECOND * rp->fps_d, rp->fps_n);
GST_BUFFER_DURATION (buffer) =
gst_util_uint64_scale (nframes * GST_SECOND, rp->fps_d, rp->fps_n);
} else {
GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start;
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
}
gst_buffer_set_caps (buffer, GST_PAD_CAPS (rp->srcpad));
if (rp->discont) {
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
rp->discont = FALSE;
}
rp->offset += GST_BUFFER_SIZE (buffer);
rp->n_frames += nframes;
rp->segment.last_stop = GST_BUFFER_TIMESTAMP (buffer);
GST_LOG_OBJECT (rp, "Pushing buffer with time %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
ret = gst_pad_push (rp->srcpad, buffer);
return ret;
}
static GstFlowReturn
gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer)
{
GstRawParse *rp = GST_RAW_PARSE (gst_pad_get_parent (pad));
GstFlowReturn ret = GST_FLOW_OK;
GstRawParseClass *rp_class = GST_RAW_PARSE_GET_CLASS (rp);
guint buffersize, nframes;
guint buffersize;
if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT))) {
GST_DEBUG_OBJECT (rp, "received DISCONT buffer");
@ -173,55 +258,22 @@ gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer)
rp->discont = TRUE;
}
if (!rp->negotiated) {
GstCaps *caps;
if (rp_class->get_caps) {
caps = rp_class->get_caps (rp);
} else {
GST_WARNING
("Subclass doesn't implement get_caps() method, using ANY caps");
caps = gst_caps_new_any ();
}
rp->negotiated = gst_pad_set_caps (rp->srcpad, caps);
}
g_return_val_if_fail (rp->negotiated, GST_FLOW_ERROR);
g_return_val_if_fail (gst_raw_parse_set_src_caps (rp), GST_FLOW_ERROR);
gst_adapter_push (rp->adapter, buffer);
if (rp_class->multiple_frames_per_buffer) {
buffersize = gst_adapter_available (rp->adapter);
buffersize -= buffersize % rp->framesize;
nframes = buffersize / rp->framesize;
} else {
buffersize = rp->framesize;
nframes = 1;
}
while (gst_adapter_available (rp->adapter) >= buffersize) {
buffer = gst_adapter_take_buffer (rp->adapter, buffersize);
if (rp->fps_n) {
GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start +
gst_util_uint64_scale (rp->n_frames, GST_SECOND * rp->fps_d,
rp->fps_n);
GST_BUFFER_DURATION (buffer) =
gst_util_uint64_scale (nframes * GST_SECOND, rp->fps_d, rp->fps_n);
} else {
GST_BUFFER_TIMESTAMP (buffer) = rp->segment.start;
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
}
gst_buffer_set_caps (buffer, GST_PAD_CAPS (rp->srcpad));
if (rp->discont) {
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
rp->discont = FALSE;
}
ret = gst_raw_parse_push_buffer (rp, buffer);
rp->n_frames += nframes;
ret = gst_pad_push (rp->srcpad, buffer);
if (ret != GST_FLOW_OK)
break;
}
@ -230,6 +282,167 @@ gst_raw_parse_chain (GstPad * pad, GstBuffer * buffer)
return ret;
}
static void
gst_raw_parse_loop (GstElement * element)
{
GstRawParse *rp = GST_RAW_PARSE (element);
GstRawParseClass *rp_class = GST_RAW_PARSE_GET_CLASS (rp);
GstFlowReturn ret;
GstBuffer *buffer;
gint size;
if (!gst_raw_parse_set_src_caps (rp)) {
ret = GST_FLOW_ERROR;
goto pause;
}
if (rp_class->multiple_frames_per_buffer)
size = 1024 * rp->framesize;
else
size = rp->framesize;
if (rp->offset + size > rp->upstream_length) {
GstFormat fmt = GST_FORMAT_BYTES;
if (!gst_pad_query_peer_duration (rp->sinkpad, &fmt, &rp->upstream_length)
|| rp->upstream_length < rp->offset + rp->framesize) {
ret = GST_FLOW_UNEXPECTED;
goto pause;
}
if (rp->offset + size > rp->upstream_length) {
size = rp->upstream_length - rp->offset;
size -= size % rp->framesize;
}
}
ret = gst_pad_pull_range (rp->sinkpad, rp->offset, size, &buffer);
if (GST_FLOW_IS_FATAL (ret)) {
GST_DEBUG_OBJECT (rp, "pull_range (%" G_GINT64_FORMAT ", %u) "
"failed, flow: %s", rp->offset, size, gst_flow_get_name (ret));
buffer = NULL;
goto pause;
}
if (GST_BUFFER_SIZE (buffer) < size) {
GST_DEBUG_OBJECT (rp, "Short read at offset %" G_GINT64_FORMAT
", got only %u of %u bytes", rp->offset, GST_BUFFER_SIZE (buffer),
size);
gst_buffer_unref (buffer);
buffer = NULL;
ret = GST_FLOW_UNEXPECTED;
goto pause;
}
if (rp->need_newsegment) {
GST_DEBUG_OBJECT (rp, "sending newsegment from %" GST_TIME_FORMAT
" to %" GST_TIME_FORMAT, GST_TIME_ARGS (rp->segment.start),
GST_TIME_ARGS (rp->segment.stop));
if (gst_pad_push_event (rp->srcpad, gst_event_new_new_segment (FALSE,
rp->segment.rate, GST_FORMAT_TIME, rp->segment.start,
rp->segment.stop, rp->segment.last_stop)));
rp->need_newsegment = FALSE;
}
ret = gst_raw_parse_push_buffer (rp, buffer);
if (GST_FLOW_IS_FATAL (ret))
goto pause;
return;
pause:
{
const gchar *reason = gst_flow_get_name (ret);
GST_LOG_OBJECT (rp, "pausing task, reason %s", reason);
gst_pad_pause_task (rp->sinkpad);
if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
if (ret == GST_FLOW_UNEXPECTED && rp->srcpad) {
if (rp->segment.flags & GST_SEEK_FLAG_SEGMENT) {
GstClockTime stop;
GST_LOG_OBJECT (rp, "Sending segment done");
if ((stop = rp->segment.stop) == -1)
stop = rp->segment.duration;
gst_element_post_message (GST_ELEMENT_CAST (rp),
gst_message_new_segment_done (GST_OBJECT_CAST (rp),
rp->segment.format, stop));
} else {
GST_LOG_OBJECT (rp, "Sending EOS, at end of stream");
gst_pad_push_event (rp->srcpad, gst_event_new_eos ());
}
} else {
GST_ELEMENT_ERROR (rp, STREAM, FAILED,
("Internal data stream error."),
("stream stopped, reason %s", reason));
if (rp->srcpad)
gst_pad_push_event (rp->srcpad, gst_event_new_eos ());
}
}
return;
}
}
static gboolean
gst_raw_parse_sink_activate (GstPad * sinkpad)
{
if (gst_pad_check_pull_range (sinkpad)) {
GST_RAW_PARSE (GST_PAD_PARENT (sinkpad))->mode = GST_ACTIVATE_PULL;
return gst_pad_activate_pull (sinkpad, TRUE);
} else {
GST_RAW_PARSE (GST_PAD_PARENT (sinkpad))->mode = GST_ACTIVATE_PUSH;
return gst_pad_activate_push (sinkpad, TRUE);
}
}
static gboolean
gst_raw_parse_sink_activatepull (GstPad * sinkpad, gboolean active)
{
gboolean result;
if (active) {
result = gst_pad_start_task (sinkpad,
(GstTaskFunction) gst_raw_parse_loop, GST_PAD_PARENT (sinkpad));
} else {
result = gst_pad_stop_task (sinkpad);
}
return result;
}
static GstStateChangeReturn
gst_raw_parse_change_state (GstElement * element, GstStateChange transition)
{
GstRawParse *rp = GST_RAW_PARSE (element);
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_segment_init (&rp->segment, GST_FORMAT_TIME);
rp->segment.last_stop = 0;
default:
break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_raw_parse_reset (rp);
break;
default:
break;
}
return ret;
}
static gboolean
gst_raw_parse_convert (GstRawParse * rp,
GstFormat src_format, gint64 src_value,
@ -338,7 +551,9 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event)
gboolean ret;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
case GST_EVENT_FLUSH_STOP:
/* Only happens in push mode */
gst_raw_parse_reset (rp);
ret = gst_pad_push_event (rp->srcpad, event);
break;
@ -349,6 +564,8 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event)
gboolean update;
GstFormat format;
/* Only happens in push mode */
gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
&start, &stop, &time);
@ -380,6 +597,7 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event)
if (ret) {
rp->n_frames = 0;
rp->offset = 0;
rp->discont = TRUE;
gst_adapter_clear (rp->adapter);
}
@ -395,6 +613,138 @@ gst_raw_parse_sink_event (GstPad * pad, GstEvent * event)
return ret;
}
static gboolean
gst_raw_parse_handle_seek_push (GstRawParse * rp, GstEvent * event)
{
GstFormat format;
gdouble rate;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
gboolean ret = FALSE;
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
/* First try if upstream handles the seek */
ret = gst_pad_push_event (rp->sinkpad, event);
if (ret)
return ret;
gst_event_unref (event);
/* Otherwise convert to bytes and push upstream */
if (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT) {
ret = gst_raw_parse_convert (rp, format, start, GST_FORMAT_BYTES, &start);
ret &= gst_raw_parse_convert (rp, format, stop, GST_FORMAT_BYTES, &stop);
if (ret) {
/* Seek on a frame boundary */
start -= start % rp->framesize;
if (stop != -1)
stop += rp->framesize - stop % rp->framesize;
event =
gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, start_type,
start, stop_type, stop);
ret = gst_pad_push_event (rp->sinkpad, event);
}
}
return ret;
}
static gboolean
gst_raw_parse_handle_seek_pull (GstRawParse * rp, GstEvent * event)
{
GstFormat format;
gdouble rate;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
gint64 start_byte, stop_byte;
gint64 last_stop;
gboolean ret = FALSE;
gboolean flush;
GstFormat fmt = GST_FORMAT_BYTES;
GstSegment segment;
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
gst_event_unref (event);
if (format != GST_FORMAT_TIME && format != GST_FORMAT_DEFAULT) {
GST_DEBUG ("seeking is only supported in TIME or DEFAULT format");
return FALSE;
}
GST_OBJECT_LOCK (rp);
if (stop == -1 && !gst_pad_query_peer_duration (rp->sinkpad, &fmt, &stop))
stop = -1;
ret =
gst_raw_parse_convert (rp, format, start, GST_FORMAT_BYTES, &start_byte);
ret &= gst_raw_parse_convert (rp, format, stop, GST_FORMAT_BYTES, &stop_byte);
if (ret) {
/* Seek on a frame boundary */
start_byte -= start_byte % rp->framesize;
if (stop_byte != -1)
stop_byte += rp->framesize - stop_byte % rp->framesize;
flush = ((flags & GST_SEEK_FLAG_FLUSH) != 0);
segment = rp->segment;
gst_segment_set_seek (&segment, rate, GST_FORMAT_TIME, flags, start_type,
start, stop_type, stop, NULL);
gst_pad_push_event (rp->sinkpad, gst_event_new_flush_start ());
if (flush)
gst_pad_push_event (rp->srcpad, gst_event_new_flush_start ());
else
gst_pad_pause_task (rp->sinkpad);
GST_PAD_STREAM_LOCK (rp->sinkpad);
last_stop = rp->segment.last_stop;
gst_pad_push_event (rp->sinkpad, gst_event_new_flush_stop ());
if (flush)
gst_pad_push_event (rp->srcpad, gst_event_new_flush_stop ());
GST_DEBUG_OBJECT (rp, "Performing seek to %" GST_TIME_FORMAT ", byte %"
G_GINT64_FORMAT, GST_TIME_ARGS (segment.start), start_byte);
rp->offset = start_byte;
rp->segment = segment;
rp->segment.last_stop = start;
rp->need_newsegment = TRUE;
rp->discont = (last_stop != start) ? TRUE : FALSE;
if (rp->segment.flags & GST_SEEK_FLAG_SEGMENT) {
gst_element_post_message (GST_ELEMENT_CAST (rp),
gst_message_new_segment_start (GST_OBJECT_CAST (rp),
rp->segment.format, rp->segment.last_stop));
}
GST_PAD_STREAM_UNLOCK (rp->sinkpad);
} else {
GST_DEBUG_OBJECT (rp, "Seek failed: couldn't convert to byte positions");
}
GST_OBJECT_UNLOCK (rp);
gst_pad_start_task (rp->sinkpad, (GstTaskFunction) gst_raw_parse_loop, rp);
return ret;
}
static gboolean
gst_raw_parse_src_event (GstPad * pad, GstEvent * event)
{
@ -402,51 +752,17 @@ gst_raw_parse_src_event (GstPad * pad, GstEvent * event)
gboolean ret;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:{
GstFormat format;
gdouble rate;
GstSeekFlags flags;
GstSeekType start_type, stop_type;
gint64 start, stop;
gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
&stop_type, &stop);
/* First try if upstream handles the seek */
ret = gst_pad_push_event (rp->sinkpad, event);
if (ret)
goto done;
/* Otherwise convert to bytes and push upstream */
if (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT) {
gst_event_unref (event);
ret =
gst_raw_parse_convert (rp, format, start, GST_FORMAT_BYTES, &start);
ret &=
gst_raw_parse_convert (rp, format, stop, GST_FORMAT_BYTES, &stop);
if (ret) {
/* Seek on a frame boundary */
start -= start % rp->framesize;
if (stop != -1)
stop += rp->framesize - stop % rp->framesize;
event =
gst_event_new_seek (rate, GST_FORMAT_BYTES, flags, start_type,
start, stop_type, stop);
ret = gst_pad_push_event (rp->sinkpad, event);
}
}
case GST_EVENT_SEEK:
if (rp->mode == GST_ACTIVATE_PUSH)
ret = gst_raw_parse_handle_seek_push (rp, event);
else
ret = gst_raw_parse_handle_seek_pull (rp, event);
break;
}
default:
ret = gst_pad_event_default (rp->srcpad, event);
break;
}
done:
gst_object_unref (rp);
return ret;
@ -483,8 +799,7 @@ gst_raw_parse_src_query (GstPad * pad, GstQuery * query)
gst_query_parse_position (query, &format, NULL);
time = gst_util_uint64_scale (rp->n_frames,
GST_SECOND * rp->fps_d, rp->fps_n);
time = rp->segment.last_stop;
ret = gst_raw_parse_convert (rp, GST_FORMAT_TIME, time, format, &value);
gst_query_set_position (query, format, value);

View file

@ -52,6 +52,7 @@ struct _GstRawParse
GstPad *sinkpad;
GstPad *srcpad;
GstActivateMode mode;
GstAdapter *adapter;
int framesize;
@ -60,11 +61,14 @@ struct _GstRawParse
gboolean discont;
guint64 n_frames;
gint64 offset;
GstSegment segment;
gint64 upstream_length;
gboolean negotiated;
gboolean have_new_segment;
gboolean need_newsegment;
};
struct _GstRawParseClass