diff --git a/ChangeLog b/ChangeLog index 56138f8a65..bcb49c601e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,26 @@ +2005-02-22 Andy Wingo + + * gst/elements/gstidentity.h + * gst/elements/gstidentity.c (gst_identity_handle_buffer): New + proc, factored out of the old chain func. + (gst_identity_chain): Use handle_buffer. Lock the stream. + (gst_identity_getrange): Lock the stream. Still doesn't do any + reporting tho. + (gst_identity_event): Handle flush events in the loop-based and + decoupled cases. + (identity_queue_flush, identity_queue_pop, identity_queue_push): + New procs, implement a 1-data buffer pen between threads in + decoupled operation. + (gst_identity_class_init, gst_identity_get_property) + (gst_identity_set_property): Use PROP_FOO instead of ARG_FOO. It's + not null if we get it, but I might remove five year old code. Add + has-src-loop, has-sink-loop, has-chain, has-getrange properties, + remove loop-based. + (gst_identity_finalize): Free mutex and cond. + (gst_identity_init): Alloc mutex and cond. + (gst_identity_sink_loop, gst_identity_src_loop): New procs. + (gst_identity_set_dataflow_funcs): New proc. + 2005-02-21 Wim Taymans * gst/elements/gstfakesink.c: (gst_fakesink_get_property), @@ -23,7 +46,6 @@ preroll sample is queued. Add more info in gst-launch regarding state changes. - 2005-02-21 Andy Wingo * gst/elements/gstfakesink.c: diff --git a/gst/elements/gstidentity.c b/gst/elements/gstidentity.c index f59482f417..1d7d980985 100644 --- a/gst/elements/gstidentity.c +++ b/gst/elements/gstidentity.c @@ -72,21 +72,28 @@ enum enum { - ARG_0, - ARG_LOOP_BASED, - ARG_SLEEP_TIME, - ARG_DUPLICATE, - ARG_ERROR_AFTER, - ARG_DROP_PROBABILITY, - ARG_DATARATE, - ARG_SILENT, - ARG_LAST_MESSAGE, - ARG_DUMP, - ARG_SYNC, - ARG_CHECK_PERFECT + PROP_0, + PROP_HAS_GETRANGE, + PROP_HAS_CHAIN, + PROP_HAS_SINK_LOOP, + PROP_HAS_SRC_LOOP, + PROP_LOOP_BASED, + PROP_SLEEP_TIME, + PROP_DUPLICATE, + PROP_ERROR_AFTER, + PROP_DROP_PROBABILITY, + PROP_DATARATE, + PROP_SILENT, + PROP_LAST_MESSAGE, + PROP_DUMP, + PROP_SYNC, + PROP_CHECK_PERFECT }; +typedef GstFlowReturn (*IdentityPushFunc) (GstIdentity *, GstBuffer *); + + #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (gst_identity_debug, "identity", 0, "identity element"); @@ -101,9 +108,13 @@ static void gst_identity_get_property (GObject * object, guint prop_id, static GstElementStateReturn gst_identity_change_state (GstElement * element); static gboolean gst_identity_event (GstPad * pad, GstEvent * event); -static GstFlowReturn gst_identity_chain (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_identity_getrange (GstPad * pad, guint64 offset, guint length, GstBuffer ** buffer); +static GstFlowReturn gst_identity_chain (GstPad * pad, GstBuffer * buffer); +static void gst_identity_src_loop (GstPad * pad); +static void gst_identity_sink_loop (GstPad * pad); +static GstFlowReturn gst_identity_handle_buffer (GstIdentity * identity, + GstBuffer * buf); static void gst_identity_set_clock (GstElement * element, GstClock * clock); static GstCaps *gst_identity_proxy_getcaps (GstPad * pad); @@ -129,6 +140,9 @@ gst_identity_finalize (GObject * object) identity = GST_IDENTITY (object); + g_mutex_free (identity->pen_lock); + g_cond_free (identity->pen_cond); + g_free (identity->last_message); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -146,42 +160,54 @@ gst_identity_class_init (GstIdentityClass * klass) gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_identity_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_identity_get_property); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LOOP_BASED, - g_param_spec_boolean ("loop-based", "Loop-based", - "Set to TRUE to use loop-based rather than chain-based scheduling", - DEFAULT_LOOP_BASED, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SLEEP_TIME, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_GETRANGE, + g_param_spec_boolean ("has-getrange", "Has getrange", + "If the src pad will implement a getrange function", + TRUE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_CHAIN, + g_param_spec_boolean ("has-chain", "Has chain", + "If the sink pad will implement a chain function", + TRUE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_SRC_LOOP, + g_param_spec_boolean ("has-src-loop", "Has src loop", + "If the src pad will implement a loop function", + FALSE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_SINK_LOOP, + g_param_spec_boolean ("has-sink-loop", "Has sink loop", + "If the sink pad will implement a loop function", + FALSE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SLEEP_TIME, g_param_spec_uint ("sleep-time", "Sleep time", "Microseconds to sleep between processing", 0, G_MAXUINT, DEFAULT_SLEEP_TIME, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DUPLICATE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_DUPLICATE, g_param_spec_uint ("duplicate", "Duplicate Buffers", "Push the buffers N times", 0, G_MAXUINT, DEFAULT_DUPLICATE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ERROR_AFTER, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_ERROR_AFTER, g_param_spec_int ("error_after", "Error After", "Error after N buffers", G_MININT, G_MAXINT, DEFAULT_ERROR_AFTER, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DROP_PROBABILITY, - g_param_spec_float ("drop_probability", "Drop Probability", - "The Probability a buffer is dropped", 0.0, 1.0, + g_object_class_install_property (G_OBJECT_CLASS (klass), + PROP_DROP_PROBABILITY, g_param_spec_float ("drop_probability", + "Drop Probability", "The Probability a buffer is dropped", 0.0, 1.0, DEFAULT_DROP_PROBABILITY, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DATARATE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_DATARATE, g_param_spec_int ("datarate", "Datarate", "(Re)timestamps buffers with number of bytes per second (0 = inactive)", 0, G_MAXINT, DEFAULT_DATARATE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SILENT, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SILENT, g_param_spec_boolean ("silent", "silent", "silent", DEFAULT_SILENT, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LAST_MESSAGE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_LAST_MESSAGE, g_param_spec_string ("last-message", "last-message", "last-message", NULL, G_PARAM_READABLE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DUMP, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_DUMP, g_param_spec_boolean ("dump", "Dump", "Dump buffer contents", DEFAULT_DUMP, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SYNC, g_param_spec_boolean ("sync", "Synchronize", "Synchronize to pipeline clock", DEFAULT_SYNC, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_CHECK_PERFECT, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_CHECK_PERFECT, g_param_spec_boolean ("check-perfect", "Check For Perfect Stream", "Verify that the stream is time- and data-contiguous", DEFAULT_CHECK_PERFECT, G_PARAM_READWRITE)); @@ -207,8 +233,6 @@ gst_identity_init (GstIdentity * identity) gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate), "sink"); gst_element_add_pad (GST_ELEMENT (identity), identity->sinkpad); - gst_pad_set_chain_function (identity->sinkpad, - GST_DEBUG_FUNCPTR (gst_identity_chain)); gst_pad_set_getcaps_function (identity->sinkpad, GST_DEBUG_FUNCPTR (gst_identity_proxy_getcaps)); gst_pad_set_event_function (identity->sinkpad, @@ -223,7 +247,6 @@ gst_identity_init (GstIdentity * identity) GST_DEBUG_FUNCPTR (gst_identity_getrange)); gst_element_add_pad (GST_ELEMENT (identity), identity->srcpad); - identity->loop_based = DEFAULT_LOOP_BASED; identity->sleep_time = DEFAULT_SLEEP_TIME; identity->duplicate = DEFAULT_DUPLICATE; identity->error_after = DEFAULT_ERROR_AFTER; @@ -235,6 +258,11 @@ gst_identity_init (GstIdentity * identity) identity->dump = DEFAULT_DUMP; identity->last_message = NULL; identity->srccaps = NULL; + + identity->pen_data = NULL; + identity->pen_lock = g_mutex_new (); + identity->pen_cond = g_cond_new (); + identity->pen_flushing = FALSE; } static void @@ -256,13 +284,62 @@ gst_identity_proxy_getcaps (GstPad * pad) return gst_pad_peer_get_caps (otherpad); } +static gboolean +identity_queue_push (GstIdentity * identity, GstData * data) +{ + gboolean ret; + + g_mutex_lock (identity->pen_lock); + while (identity->pen_data && !identity->pen_flushing) + g_cond_wait (identity->pen_cond, identity->pen_lock); + if (identity->pen_flushing) { + gst_data_unref (identity->pen_data); + identity->pen_data = NULL; + gst_data_unref (data); + ret = FALSE; + } else { + identity->pen_data = data; + ret = TRUE; + } + g_cond_signal (identity->pen_cond); + g_mutex_unlock (identity->pen_lock); + + return ret; +} + +static GstData * +identity_queue_pop (GstIdentity * identity) +{ + GstData *ret; + + g_mutex_lock (identity->pen_lock); + while (!(ret = identity->pen_data) && !identity->pen_flushing) + g_cond_wait (identity->pen_cond, identity->pen_lock); + g_cond_signal (identity->pen_cond); + g_mutex_unlock (identity->pen_lock); + + return ret; +} + +static void +identity_queue_flush (GstIdentity * identity) +{ + g_mutex_lock (identity->pen_lock); + identity->pen_flushing = TRUE; + g_cond_signal (identity->pen_cond); + g_mutex_unlock (identity->pen_lock); +} + static gboolean gst_identity_event (GstPad * pad, GstEvent * event) { GstIdentity *identity; + gboolean ret; identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + GST_STREAM_LOCK (pad); + if (!identity->silent) { g_free (identity->last_message); @@ -272,7 +349,49 @@ gst_identity_event (GstPad * pad, GstEvent * event) g_object_notify (G_OBJECT (identity), "last_message"); } - return gst_pad_push_event (identity->srcpad, event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH: + g_print ("identity received flush event\n"); + /* forward event */ + gst_pad_event_default (pad, event); + if (GST_EVENT_FLUSH_DONE (event)) { + if (identity->sink_mode == GST_ACTIVATE_PULL) { + GST_STREAM_LOCK (identity->sinkpad); + gst_task_start (GST_RPAD_TASK (identity->sinkpad)); + GST_STREAM_UNLOCK (identity->sinkpad); + } + if (identity->src_mode == GST_ACTIVATE_PUSH) { + GST_STREAM_LOCK (identity->srcpad); + gst_task_start (GST_RPAD_TASK (identity->srcpad)); + GST_STREAM_UNLOCK (identity->srcpad); + } + } else { + /* unblock both functions */ + identity_queue_flush (identity); + + g_print ("identity after flush\n"); + } + ret = TRUE; + goto done; + case GST_EVENT_EOS: + g_print ("identity got eos\n"); + break; + default: + g_print ("identity got event %p of type %d\n", event, + GST_EVENT_TYPE (event)); + break; + } + + if (identity->decoupled) { + ret = identity_queue_push (identity, (GstData *) event); + } else { + ret = gst_pad_push_event (identity->srcpad, event); + } + +done: + GST_STREAM_UNLOCK (pad); + return ret; } static GstFlowReturn @@ -280,26 +399,106 @@ gst_identity_getrange (GstPad * pad, guint64 offset, guint length, GstBuffer ** buffer) { GstIdentity *identity; + GstFlowReturn ret; identity = GST_IDENTITY (GST_PAD_PARENT (pad)); - return gst_pad_pull_range (identity->sinkpad, offset, length, buffer); + GST_STREAM_LOCK (pad); + + ret = gst_pad_pull_range (identity->sinkpad, offset, length, buffer); + + GST_STREAM_UNLOCK (pad); + + return ret; } static GstFlowReturn gst_identity_chain (GstPad * pad, GstBuffer * buffer) { - GstBuffer *buf = GST_BUFFER (buffer); GstIdentity *identity; - GstFlowReturn result = GST_FLOW_OK; - guint i; - - g_return_val_if_fail (pad != NULL, GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); - g_return_val_if_fail (buf != NULL, GST_FLOW_ERROR); + GstFlowReturn ret = GST_FLOW_OK; identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + GST_STREAM_LOCK (pad); + + ret = gst_identity_handle_buffer (identity, buffer); + + GST_STREAM_UNLOCK (pad); + + return ret; +} + +#define DEFAULT_PULL_SIZE 1024 + +static void +gst_identity_sink_loop (GstPad * pad) +{ + GstIdentity *identity; + GstBuffer *buffer; + GstFlowReturn ret; + + identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + + GST_STREAM_LOCK (pad); + + ret = gst_pad_pull_range (pad, identity->offset, DEFAULT_PULL_SIZE, &buffer); + if (ret != GST_FLOW_OK) + goto sink_loop_pause; + + ret = gst_identity_handle_buffer (identity, buffer); + if (ret != GST_FLOW_OK) + goto sink_loop_pause; + + GST_STREAM_UNLOCK (pad); + return; + +sink_loop_pause: + gst_task_pause (GST_RPAD_TASK (identity->sinkpad)); + GST_STREAM_UNLOCK (pad); + return; +} + +static void +gst_identity_src_loop (GstPad * pad) +{ + GstIdentity *identity; + GstData *data; + GstFlowReturn ret; + + identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + + GST_STREAM_LOCK (pad); + + data = identity_queue_pop (identity); + if (!data) /* we're getting flushed */ + goto src_loop_pause; + + if (GST_IS_EVENT (data)) { + if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) + gst_task_pause (GST_RPAD_TASK (identity->srcpad)); + gst_pad_push_event (identity->srcpad, GST_EVENT (data)); + } else { + ret = gst_pad_push (identity->srcpad, (GstBuffer *) data); + if (ret != GST_FLOW_OK) + goto src_loop_pause; + } + + GST_STREAM_UNLOCK (pad); + return; + +src_loop_pause: + gst_task_pause (GST_RPAD_TASK (identity->srcpad)); + GST_STREAM_UNLOCK (pad); + return; +} + +static GstFlowReturn +gst_identity_handle_buffer (GstIdentity * identity, GstBuffer * buf) +{ + GstFlowReturn ret = GST_FLOW_OK; + guint i; + /* see if we need to do perfect stream checking */ /* invalid timestamp drops us out of check. FIXME: maybe warn ? */ if (identity->check_perfect && @@ -355,6 +554,7 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer) return GST_FLOW_OK; } } + if (identity->dump) { gst_util_dump_mem (GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); } @@ -378,7 +578,7 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer) time = GST_BUFFER_TIMESTAMP (buf); if (identity->datarate > 0) { - time = identity->bytes_handled * GST_SECOND / identity->datarate; + time = identity->offset * GST_SECOND / identity->datarate; GST_BUFFER_TIMESTAMP (buf) = time; GST_BUFFER_DURATION (buf) = @@ -397,13 +597,45 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer) } } - identity->bytes_handled += GST_BUFFER_SIZE (buf); - result = gst_pad_push (identity->srcpad, buf); + identity->offset += GST_BUFFER_SIZE (buf); + if (identity->decoupled) { + if (!identity_queue_push (identity, (GstData *) buf)) + return GST_FLOW_UNEXPECTED; + } else { + ret = gst_pad_push (identity->srcpad, buf); + if (ret != GST_FLOW_OK) + return ret; + } if (identity->sleep_time) g_usleep (identity->sleep_time); } - return result; + + return ret; +} + +static void +gst_identity_set_dataflow_funcs (GstIdentity * identity) +{ + if (identity->has_getrange) + gst_pad_set_getrange_function (identity->srcpad, gst_identity_getrange); + else + gst_pad_set_getrange_function (identity->srcpad, NULL); + + if (identity->has_chain) + gst_pad_set_chain_function (identity->sinkpad, gst_identity_chain); + else + gst_pad_set_chain_function (identity->sinkpad, NULL); + + if (identity->has_src_loop) + gst_pad_set_loop_function (identity->srcpad, gst_identity_src_loop); + else + gst_pad_set_loop_function (identity->srcpad, NULL); + + if (identity->has_sink_loop) + gst_pad_set_loop_function (identity->sinkpad, gst_identity_sink_loop); + else + gst_pad_set_loop_function (identity->sinkpad, NULL); } static void @@ -412,45 +644,50 @@ gst_identity_set_property (GObject * object, guint prop_id, { GstIdentity *identity; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_IDENTITY (object)); - identity = GST_IDENTITY (object); switch (prop_id) { - case ARG_LOOP_BASED: - identity->loop_based = g_value_get_boolean (value); - if (identity->loop_based) { - gst_pad_set_chain_function (identity->sinkpad, NULL); - } else { - gst_pad_set_chain_function (identity->sinkpad, gst_identity_chain); - } + case PROP_HAS_GETRANGE: + identity->has_getrange = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); break; - case ARG_SLEEP_TIME: + case PROP_HAS_CHAIN: + identity->has_chain = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); + break; + case PROP_HAS_SRC_LOOP: + identity->has_src_loop = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); + break; + case PROP_HAS_SINK_LOOP: + identity->has_sink_loop = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); + break; + case PROP_SLEEP_TIME: identity->sleep_time = g_value_get_uint (value); break; - case ARG_SILENT: + case PROP_SILENT: identity->silent = g_value_get_boolean (value); break; - case ARG_DUPLICATE: + case PROP_DUPLICATE: identity->duplicate = g_value_get_uint (value); break; - case ARG_DUMP: + case PROP_DUMP: identity->dump = g_value_get_boolean (value); break; - case ARG_ERROR_AFTER: + case PROP_ERROR_AFTER: identity->error_after = g_value_get_int (value); break; - case ARG_DROP_PROBABILITY: + case PROP_DROP_PROBABILITY: identity->drop_probability = g_value_get_float (value); break; - case ARG_DATARATE: + case PROP_DATARATE: identity->datarate = g_value_get_int (value); break; - case ARG_SYNC: + case PROP_SYNC: identity->sync = g_value_get_boolean (value); break; - case ARG_CHECK_PERFECT: + case PROP_CHECK_PERFECT: identity->check_perfect = g_value_get_boolean (value); break; default: @@ -465,43 +702,49 @@ gst_identity_get_property (GObject * object, guint prop_id, GValue * value, { GstIdentity *identity; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_IDENTITY (object)); - identity = GST_IDENTITY (object); switch (prop_id) { - case ARG_LOOP_BASED: - g_value_set_boolean (value, identity->loop_based); + case PROP_HAS_GETRANGE: + g_value_set_boolean (value, identity->has_getrange); break; - case ARG_SLEEP_TIME: + case PROP_HAS_CHAIN: + g_value_set_boolean (value, identity->has_chain); + break; + case PROP_HAS_SRC_LOOP: + g_value_set_boolean (value, identity->has_src_loop); + break; + case PROP_HAS_SINK_LOOP: + g_value_set_boolean (value, identity->has_sink_loop); + break; + case PROP_SLEEP_TIME: g_value_set_uint (value, identity->sleep_time); break; - case ARG_DUPLICATE: + case PROP_DUPLICATE: g_value_set_uint (value, identity->duplicate); break; - case ARG_ERROR_AFTER: + case PROP_ERROR_AFTER: g_value_set_int (value, identity->error_after); break; - case ARG_DROP_PROBABILITY: + case PROP_DROP_PROBABILITY: g_value_set_float (value, identity->drop_probability); break; - case ARG_DATARATE: + case PROP_DATARATE: g_value_set_int (value, identity->datarate); break; - case ARG_SILENT: + case PROP_SILENT: g_value_set_boolean (value, identity->silent); break; - case ARG_DUMP: + case PROP_DUMP: g_value_set_boolean (value, identity->dump); break; - case ARG_LAST_MESSAGE: + case PROP_LAST_MESSAGE: g_value_set_string (value, identity->last_message); break; - case ARG_SYNC: + case PROP_SYNC: g_value_set_boolean (value, identity->sync); break; - case ARG_CHECK_PERFECT: + case PROP_CHECK_PERFECT: g_value_set_boolean (value, identity->check_perfect); break; default: @@ -523,7 +766,7 @@ gst_identity_change_state (GstElement * element) case GST_STATE_NULL_TO_READY: break; case GST_STATE_READY_TO_PAUSED: - identity->bytes_handled = 0; + identity->offset = 0; identity->prev_timestamp = GST_CLOCK_TIME_NONE; identity->prev_duration = GST_CLOCK_TIME_NONE; identity->prev_offset_end = -1; diff --git a/gst/elements/gstidentity.h b/gst/elements/gstidentity.h index 00203beed5..e187f2e2c2 100644 --- a/gst/elements/gstidentity.h +++ b/gst/elements/gstidentity.h @@ -50,7 +50,19 @@ struct _GstIdentity { GstPad *sinkpad; GstPad *srcpad; - gboolean loop_based; + GstData *pen_data; + GMutex *pen_lock; + GCond *pen_cond; + gboolean pen_flushing; + + gboolean has_chain; + gboolean has_getrange; + gboolean has_src_loop; + gboolean has_sink_loop; + GstActivateMode sink_mode; + GstActivateMode src_mode; + gboolean decoupled; + guint duplicate; gint error_after; gfloat drop_probability; @@ -67,7 +79,7 @@ struct _GstIdentity { gchar *last_message; GstCaps *srccaps; - guint64 bytes_handled; + guint64 offset; }; struct _GstIdentityClass { diff --git a/plugins/elements/gstidentity.c b/plugins/elements/gstidentity.c index f59482f417..1d7d980985 100644 --- a/plugins/elements/gstidentity.c +++ b/plugins/elements/gstidentity.c @@ -72,21 +72,28 @@ enum enum { - ARG_0, - ARG_LOOP_BASED, - ARG_SLEEP_TIME, - ARG_DUPLICATE, - ARG_ERROR_AFTER, - ARG_DROP_PROBABILITY, - ARG_DATARATE, - ARG_SILENT, - ARG_LAST_MESSAGE, - ARG_DUMP, - ARG_SYNC, - ARG_CHECK_PERFECT + PROP_0, + PROP_HAS_GETRANGE, + PROP_HAS_CHAIN, + PROP_HAS_SINK_LOOP, + PROP_HAS_SRC_LOOP, + PROP_LOOP_BASED, + PROP_SLEEP_TIME, + PROP_DUPLICATE, + PROP_ERROR_AFTER, + PROP_DROP_PROBABILITY, + PROP_DATARATE, + PROP_SILENT, + PROP_LAST_MESSAGE, + PROP_DUMP, + PROP_SYNC, + PROP_CHECK_PERFECT }; +typedef GstFlowReturn (*IdentityPushFunc) (GstIdentity *, GstBuffer *); + + #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (gst_identity_debug, "identity", 0, "identity element"); @@ -101,9 +108,13 @@ static void gst_identity_get_property (GObject * object, guint prop_id, static GstElementStateReturn gst_identity_change_state (GstElement * element); static gboolean gst_identity_event (GstPad * pad, GstEvent * event); -static GstFlowReturn gst_identity_chain (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_identity_getrange (GstPad * pad, guint64 offset, guint length, GstBuffer ** buffer); +static GstFlowReturn gst_identity_chain (GstPad * pad, GstBuffer * buffer); +static void gst_identity_src_loop (GstPad * pad); +static void gst_identity_sink_loop (GstPad * pad); +static GstFlowReturn gst_identity_handle_buffer (GstIdentity * identity, + GstBuffer * buf); static void gst_identity_set_clock (GstElement * element, GstClock * clock); static GstCaps *gst_identity_proxy_getcaps (GstPad * pad); @@ -129,6 +140,9 @@ gst_identity_finalize (GObject * object) identity = GST_IDENTITY (object); + g_mutex_free (identity->pen_lock); + g_cond_free (identity->pen_cond); + g_free (identity->last_message); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -146,42 +160,54 @@ gst_identity_class_init (GstIdentityClass * klass) gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_identity_set_property); gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_identity_get_property); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LOOP_BASED, - g_param_spec_boolean ("loop-based", "Loop-based", - "Set to TRUE to use loop-based rather than chain-based scheduling", - DEFAULT_LOOP_BASED, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SLEEP_TIME, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_GETRANGE, + g_param_spec_boolean ("has-getrange", "Has getrange", + "If the src pad will implement a getrange function", + TRUE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_CHAIN, + g_param_spec_boolean ("has-chain", "Has chain", + "If the sink pad will implement a chain function", + TRUE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_SRC_LOOP, + g_param_spec_boolean ("has-src-loop", "Has src loop", + "If the src pad will implement a loop function", + FALSE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_SINK_LOOP, + g_param_spec_boolean ("has-sink-loop", "Has sink loop", + "If the sink pad will implement a loop function", + FALSE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SLEEP_TIME, g_param_spec_uint ("sleep-time", "Sleep time", "Microseconds to sleep between processing", 0, G_MAXUINT, DEFAULT_SLEEP_TIME, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DUPLICATE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_DUPLICATE, g_param_spec_uint ("duplicate", "Duplicate Buffers", "Push the buffers N times", 0, G_MAXUINT, DEFAULT_DUPLICATE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ERROR_AFTER, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_ERROR_AFTER, g_param_spec_int ("error_after", "Error After", "Error after N buffers", G_MININT, G_MAXINT, DEFAULT_ERROR_AFTER, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DROP_PROBABILITY, - g_param_spec_float ("drop_probability", "Drop Probability", - "The Probability a buffer is dropped", 0.0, 1.0, + g_object_class_install_property (G_OBJECT_CLASS (klass), + PROP_DROP_PROBABILITY, g_param_spec_float ("drop_probability", + "Drop Probability", "The Probability a buffer is dropped", 0.0, 1.0, DEFAULT_DROP_PROBABILITY, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DATARATE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_DATARATE, g_param_spec_int ("datarate", "Datarate", "(Re)timestamps buffers with number of bytes per second (0 = inactive)", 0, G_MAXINT, DEFAULT_DATARATE, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SILENT, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SILENT, g_param_spec_boolean ("silent", "silent", "silent", DEFAULT_SILENT, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LAST_MESSAGE, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_LAST_MESSAGE, g_param_spec_string ("last-message", "last-message", "last-message", NULL, G_PARAM_READABLE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DUMP, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_DUMP, g_param_spec_boolean ("dump", "Dump", "Dump buffer contents", DEFAULT_DUMP, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SYNC, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SYNC, g_param_spec_boolean ("sync", "Synchronize", "Synchronize to pipeline clock", DEFAULT_SYNC, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_CHECK_PERFECT, + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_CHECK_PERFECT, g_param_spec_boolean ("check-perfect", "Check For Perfect Stream", "Verify that the stream is time- and data-contiguous", DEFAULT_CHECK_PERFECT, G_PARAM_READWRITE)); @@ -207,8 +233,6 @@ gst_identity_init (GstIdentity * identity) gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate), "sink"); gst_element_add_pad (GST_ELEMENT (identity), identity->sinkpad); - gst_pad_set_chain_function (identity->sinkpad, - GST_DEBUG_FUNCPTR (gst_identity_chain)); gst_pad_set_getcaps_function (identity->sinkpad, GST_DEBUG_FUNCPTR (gst_identity_proxy_getcaps)); gst_pad_set_event_function (identity->sinkpad, @@ -223,7 +247,6 @@ gst_identity_init (GstIdentity * identity) GST_DEBUG_FUNCPTR (gst_identity_getrange)); gst_element_add_pad (GST_ELEMENT (identity), identity->srcpad); - identity->loop_based = DEFAULT_LOOP_BASED; identity->sleep_time = DEFAULT_SLEEP_TIME; identity->duplicate = DEFAULT_DUPLICATE; identity->error_after = DEFAULT_ERROR_AFTER; @@ -235,6 +258,11 @@ gst_identity_init (GstIdentity * identity) identity->dump = DEFAULT_DUMP; identity->last_message = NULL; identity->srccaps = NULL; + + identity->pen_data = NULL; + identity->pen_lock = g_mutex_new (); + identity->pen_cond = g_cond_new (); + identity->pen_flushing = FALSE; } static void @@ -256,13 +284,62 @@ gst_identity_proxy_getcaps (GstPad * pad) return gst_pad_peer_get_caps (otherpad); } +static gboolean +identity_queue_push (GstIdentity * identity, GstData * data) +{ + gboolean ret; + + g_mutex_lock (identity->pen_lock); + while (identity->pen_data && !identity->pen_flushing) + g_cond_wait (identity->pen_cond, identity->pen_lock); + if (identity->pen_flushing) { + gst_data_unref (identity->pen_data); + identity->pen_data = NULL; + gst_data_unref (data); + ret = FALSE; + } else { + identity->pen_data = data; + ret = TRUE; + } + g_cond_signal (identity->pen_cond); + g_mutex_unlock (identity->pen_lock); + + return ret; +} + +static GstData * +identity_queue_pop (GstIdentity * identity) +{ + GstData *ret; + + g_mutex_lock (identity->pen_lock); + while (!(ret = identity->pen_data) && !identity->pen_flushing) + g_cond_wait (identity->pen_cond, identity->pen_lock); + g_cond_signal (identity->pen_cond); + g_mutex_unlock (identity->pen_lock); + + return ret; +} + +static void +identity_queue_flush (GstIdentity * identity) +{ + g_mutex_lock (identity->pen_lock); + identity->pen_flushing = TRUE; + g_cond_signal (identity->pen_cond); + g_mutex_unlock (identity->pen_lock); +} + static gboolean gst_identity_event (GstPad * pad, GstEvent * event) { GstIdentity *identity; + gboolean ret; identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + GST_STREAM_LOCK (pad); + if (!identity->silent) { g_free (identity->last_message); @@ -272,7 +349,49 @@ gst_identity_event (GstPad * pad, GstEvent * event) g_object_notify (G_OBJECT (identity), "last_message"); } - return gst_pad_push_event (identity->srcpad, event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH: + g_print ("identity received flush event\n"); + /* forward event */ + gst_pad_event_default (pad, event); + if (GST_EVENT_FLUSH_DONE (event)) { + if (identity->sink_mode == GST_ACTIVATE_PULL) { + GST_STREAM_LOCK (identity->sinkpad); + gst_task_start (GST_RPAD_TASK (identity->sinkpad)); + GST_STREAM_UNLOCK (identity->sinkpad); + } + if (identity->src_mode == GST_ACTIVATE_PUSH) { + GST_STREAM_LOCK (identity->srcpad); + gst_task_start (GST_RPAD_TASK (identity->srcpad)); + GST_STREAM_UNLOCK (identity->srcpad); + } + } else { + /* unblock both functions */ + identity_queue_flush (identity); + + g_print ("identity after flush\n"); + } + ret = TRUE; + goto done; + case GST_EVENT_EOS: + g_print ("identity got eos\n"); + break; + default: + g_print ("identity got event %p of type %d\n", event, + GST_EVENT_TYPE (event)); + break; + } + + if (identity->decoupled) { + ret = identity_queue_push (identity, (GstData *) event); + } else { + ret = gst_pad_push_event (identity->srcpad, event); + } + +done: + GST_STREAM_UNLOCK (pad); + return ret; } static GstFlowReturn @@ -280,26 +399,106 @@ gst_identity_getrange (GstPad * pad, guint64 offset, guint length, GstBuffer ** buffer) { GstIdentity *identity; + GstFlowReturn ret; identity = GST_IDENTITY (GST_PAD_PARENT (pad)); - return gst_pad_pull_range (identity->sinkpad, offset, length, buffer); + GST_STREAM_LOCK (pad); + + ret = gst_pad_pull_range (identity->sinkpad, offset, length, buffer); + + GST_STREAM_UNLOCK (pad); + + return ret; } static GstFlowReturn gst_identity_chain (GstPad * pad, GstBuffer * buffer) { - GstBuffer *buf = GST_BUFFER (buffer); GstIdentity *identity; - GstFlowReturn result = GST_FLOW_OK; - guint i; - - g_return_val_if_fail (pad != NULL, GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); - g_return_val_if_fail (buf != NULL, GST_FLOW_ERROR); + GstFlowReturn ret = GST_FLOW_OK; identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + GST_STREAM_LOCK (pad); + + ret = gst_identity_handle_buffer (identity, buffer); + + GST_STREAM_UNLOCK (pad); + + return ret; +} + +#define DEFAULT_PULL_SIZE 1024 + +static void +gst_identity_sink_loop (GstPad * pad) +{ + GstIdentity *identity; + GstBuffer *buffer; + GstFlowReturn ret; + + identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + + GST_STREAM_LOCK (pad); + + ret = gst_pad_pull_range (pad, identity->offset, DEFAULT_PULL_SIZE, &buffer); + if (ret != GST_FLOW_OK) + goto sink_loop_pause; + + ret = gst_identity_handle_buffer (identity, buffer); + if (ret != GST_FLOW_OK) + goto sink_loop_pause; + + GST_STREAM_UNLOCK (pad); + return; + +sink_loop_pause: + gst_task_pause (GST_RPAD_TASK (identity->sinkpad)); + GST_STREAM_UNLOCK (pad); + return; +} + +static void +gst_identity_src_loop (GstPad * pad) +{ + GstIdentity *identity; + GstData *data; + GstFlowReturn ret; + + identity = GST_IDENTITY (GST_PAD_PARENT (pad)); + + GST_STREAM_LOCK (pad); + + data = identity_queue_pop (identity); + if (!data) /* we're getting flushed */ + goto src_loop_pause; + + if (GST_IS_EVENT (data)) { + if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) + gst_task_pause (GST_RPAD_TASK (identity->srcpad)); + gst_pad_push_event (identity->srcpad, GST_EVENT (data)); + } else { + ret = gst_pad_push (identity->srcpad, (GstBuffer *) data); + if (ret != GST_FLOW_OK) + goto src_loop_pause; + } + + GST_STREAM_UNLOCK (pad); + return; + +src_loop_pause: + gst_task_pause (GST_RPAD_TASK (identity->srcpad)); + GST_STREAM_UNLOCK (pad); + return; +} + +static GstFlowReturn +gst_identity_handle_buffer (GstIdentity * identity, GstBuffer * buf) +{ + GstFlowReturn ret = GST_FLOW_OK; + guint i; + /* see if we need to do perfect stream checking */ /* invalid timestamp drops us out of check. FIXME: maybe warn ? */ if (identity->check_perfect && @@ -355,6 +554,7 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer) return GST_FLOW_OK; } } + if (identity->dump) { gst_util_dump_mem (GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); } @@ -378,7 +578,7 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer) time = GST_BUFFER_TIMESTAMP (buf); if (identity->datarate > 0) { - time = identity->bytes_handled * GST_SECOND / identity->datarate; + time = identity->offset * GST_SECOND / identity->datarate; GST_BUFFER_TIMESTAMP (buf) = time; GST_BUFFER_DURATION (buf) = @@ -397,13 +597,45 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer) } } - identity->bytes_handled += GST_BUFFER_SIZE (buf); - result = gst_pad_push (identity->srcpad, buf); + identity->offset += GST_BUFFER_SIZE (buf); + if (identity->decoupled) { + if (!identity_queue_push (identity, (GstData *) buf)) + return GST_FLOW_UNEXPECTED; + } else { + ret = gst_pad_push (identity->srcpad, buf); + if (ret != GST_FLOW_OK) + return ret; + } if (identity->sleep_time) g_usleep (identity->sleep_time); } - return result; + + return ret; +} + +static void +gst_identity_set_dataflow_funcs (GstIdentity * identity) +{ + if (identity->has_getrange) + gst_pad_set_getrange_function (identity->srcpad, gst_identity_getrange); + else + gst_pad_set_getrange_function (identity->srcpad, NULL); + + if (identity->has_chain) + gst_pad_set_chain_function (identity->sinkpad, gst_identity_chain); + else + gst_pad_set_chain_function (identity->sinkpad, NULL); + + if (identity->has_src_loop) + gst_pad_set_loop_function (identity->srcpad, gst_identity_src_loop); + else + gst_pad_set_loop_function (identity->srcpad, NULL); + + if (identity->has_sink_loop) + gst_pad_set_loop_function (identity->sinkpad, gst_identity_sink_loop); + else + gst_pad_set_loop_function (identity->sinkpad, NULL); } static void @@ -412,45 +644,50 @@ gst_identity_set_property (GObject * object, guint prop_id, { GstIdentity *identity; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_IDENTITY (object)); - identity = GST_IDENTITY (object); switch (prop_id) { - case ARG_LOOP_BASED: - identity->loop_based = g_value_get_boolean (value); - if (identity->loop_based) { - gst_pad_set_chain_function (identity->sinkpad, NULL); - } else { - gst_pad_set_chain_function (identity->sinkpad, gst_identity_chain); - } + case PROP_HAS_GETRANGE: + identity->has_getrange = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); break; - case ARG_SLEEP_TIME: + case PROP_HAS_CHAIN: + identity->has_chain = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); + break; + case PROP_HAS_SRC_LOOP: + identity->has_src_loop = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); + break; + case PROP_HAS_SINK_LOOP: + identity->has_sink_loop = g_value_get_boolean (value); + gst_identity_set_dataflow_funcs (identity); + break; + case PROP_SLEEP_TIME: identity->sleep_time = g_value_get_uint (value); break; - case ARG_SILENT: + case PROP_SILENT: identity->silent = g_value_get_boolean (value); break; - case ARG_DUPLICATE: + case PROP_DUPLICATE: identity->duplicate = g_value_get_uint (value); break; - case ARG_DUMP: + case PROP_DUMP: identity->dump = g_value_get_boolean (value); break; - case ARG_ERROR_AFTER: + case PROP_ERROR_AFTER: identity->error_after = g_value_get_int (value); break; - case ARG_DROP_PROBABILITY: + case PROP_DROP_PROBABILITY: identity->drop_probability = g_value_get_float (value); break; - case ARG_DATARATE: + case PROP_DATARATE: identity->datarate = g_value_get_int (value); break; - case ARG_SYNC: + case PROP_SYNC: identity->sync = g_value_get_boolean (value); break; - case ARG_CHECK_PERFECT: + case PROP_CHECK_PERFECT: identity->check_perfect = g_value_get_boolean (value); break; default: @@ -465,43 +702,49 @@ gst_identity_get_property (GObject * object, guint prop_id, GValue * value, { GstIdentity *identity; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_IDENTITY (object)); - identity = GST_IDENTITY (object); switch (prop_id) { - case ARG_LOOP_BASED: - g_value_set_boolean (value, identity->loop_based); + case PROP_HAS_GETRANGE: + g_value_set_boolean (value, identity->has_getrange); break; - case ARG_SLEEP_TIME: + case PROP_HAS_CHAIN: + g_value_set_boolean (value, identity->has_chain); + break; + case PROP_HAS_SRC_LOOP: + g_value_set_boolean (value, identity->has_src_loop); + break; + case PROP_HAS_SINK_LOOP: + g_value_set_boolean (value, identity->has_sink_loop); + break; + case PROP_SLEEP_TIME: g_value_set_uint (value, identity->sleep_time); break; - case ARG_DUPLICATE: + case PROP_DUPLICATE: g_value_set_uint (value, identity->duplicate); break; - case ARG_ERROR_AFTER: + case PROP_ERROR_AFTER: g_value_set_int (value, identity->error_after); break; - case ARG_DROP_PROBABILITY: + case PROP_DROP_PROBABILITY: g_value_set_float (value, identity->drop_probability); break; - case ARG_DATARATE: + case PROP_DATARATE: g_value_set_int (value, identity->datarate); break; - case ARG_SILENT: + case PROP_SILENT: g_value_set_boolean (value, identity->silent); break; - case ARG_DUMP: + case PROP_DUMP: g_value_set_boolean (value, identity->dump); break; - case ARG_LAST_MESSAGE: + case PROP_LAST_MESSAGE: g_value_set_string (value, identity->last_message); break; - case ARG_SYNC: + case PROP_SYNC: g_value_set_boolean (value, identity->sync); break; - case ARG_CHECK_PERFECT: + case PROP_CHECK_PERFECT: g_value_set_boolean (value, identity->check_perfect); break; default: @@ -523,7 +766,7 @@ gst_identity_change_state (GstElement * element) case GST_STATE_NULL_TO_READY: break; case GST_STATE_READY_TO_PAUSED: - identity->bytes_handled = 0; + identity->offset = 0; identity->prev_timestamp = GST_CLOCK_TIME_NONE; identity->prev_duration = GST_CLOCK_TIME_NONE; identity->prev_offset_end = -1; diff --git a/plugins/elements/gstidentity.h b/plugins/elements/gstidentity.h index 00203beed5..e187f2e2c2 100644 --- a/plugins/elements/gstidentity.h +++ b/plugins/elements/gstidentity.h @@ -50,7 +50,19 @@ struct _GstIdentity { GstPad *sinkpad; GstPad *srcpad; - gboolean loop_based; + GstData *pen_data; + GMutex *pen_lock; + GCond *pen_cond; + gboolean pen_flushing; + + gboolean has_chain; + gboolean has_getrange; + gboolean has_src_loop; + gboolean has_sink_loop; + GstActivateMode sink_mode; + GstActivateMode src_mode; + gboolean decoupled; + guint duplicate; gint error_after; gfloat drop_probability; @@ -67,7 +79,7 @@ struct _GstIdentity { gchar *last_message; GstCaps *srccaps; - guint64 bytes_handled; + guint64 offset; }; struct _GstIdentityClass {