adder: rework pending event handling

Use atomic ops on pending flags. Rename the segment_pending to
new_segment_pending. Set new_segment_pending not when we received seek, but
when we received the first upstream new_segment.
This commit is contained in:
Stefan Kost 2011-07-25 19:39:55 +02:00
parent a8228b062a
commit 9a26e6c7bc
2 changed files with 21 additions and 11 deletions

View file

@ -672,7 +672,7 @@ gst_adder_src_event (GstPad * pad, GstEvent * event)
* forwarding the seek upstream or from gst_adder_collected,
* whichever happens first.
*/
adder->flush_stop_pending = TRUE;
g_atomic_int_set (&adder->flush_stop_pending, TRUE);
}
GST_DEBUG_OBJECT (adder, "handling seek event: %" GST_PTR_FORMAT, event);
@ -688,9 +688,6 @@ gst_adder_src_event (GstPad * pad, GstEvent * event)
adder->segment_end = end;
else
adder->segment_end = GST_CLOCK_TIME_NONE;
/* make sure we push a new segment, to inform about new basetime
* see FIXME in gst_adder_collected() */
adder->segment_pending = TRUE;
if (flush) {
/* Yes, we need to call _set_flushing again *WHEN* the streaming threads
* have stopped so that the cookie gets properly updated. */
@ -700,6 +697,9 @@ gst_adder_src_event (GstPad * pad, GstEvent * event)
GST_DEBUG_OBJECT (adder, "forwarding seek event: %" GST_PTR_FORMAT,
event);
/* we're forwarding seek to all upstream peers and wait for one to reply
* with a newsegment-event before we send a newsegment-event downstream */
g_atomic_int_set (&adder->wait_for_new_segment, TRUE);
result = forward_event (adder, event, flush);
if (!result) {
/* seek failed. maybe source is a live source. */
@ -755,8 +755,8 @@ gst_adder_sink_event (GstPad * pad, GstEvent * event)
* flush-stop from the collect function later.
*/
GST_OBJECT_LOCK (adder->collect);
adder->segment_pending = TRUE;
adder->flush_stop_pending = FALSE;
g_atomic_int_set (&adder->new_segment_pending, TRUE);
g_atomic_int_set (&adder->flush_stop_pending, FALSE);
/* Clear pending tags */
if (adder->pending_events) {
g_list_foreach (adder->pending_events, (GFunc) gst_event_unref, NULL);
@ -771,6 +771,14 @@ gst_adder_sink_event (GstPad * pad, GstEvent * event)
adder->pending_events = g_list_append (adder->pending_events, event);
GST_OBJECT_UNLOCK (adder->collect);
goto beach;
case GST_EVENT_NEWSEGMENT:
if (g_atomic_int_compare_and_exchange (&adder->wait_for_new_segment,
TRUE, FALSE)) {
/* make sure we push a new segment, to inform about new basetime
* see FIXME in gst_adder_collected() */
g_atomic_int_set (&adder->new_segment_pending, TRUE);
}
break;
default:
break;
}
@ -1150,7 +1158,8 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
/* we had an output buffer, unref the gapbuffer we kept */
gst_buffer_unref (gapbuf);
if (adder->segment_pending) {
if (g_atomic_int_compare_and_exchange (&adder->new_segment_pending, TRUE,
FALSE)) {
GstEvent *event;
/* FIXME, use rate/applied_rate as set on all sinkpads.
@ -1180,7 +1189,6 @@ gst_adder_collected (GstCollectPads * pads, gpointer user_data)
GST_WARNING_OBJECT (adder->srcpad, "Sending event %p (%s) failed.",
event, GST_EVENT_TYPE_NAME (event));
}
adder->segment_pending = FALSE;
} else {
GST_WARNING_OBJECT (adder->srcpad, "Creating new segment event for "
"start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT " failed",
@ -1268,7 +1276,8 @@ gst_adder_change_state (GstElement * element, GstStateChange transition)
adder->timestamp = 0;
adder->offset = 0;
adder->flush_stop_pending = FALSE;
adder->segment_pending = TRUE;
adder->new_segment_pending = TRUE;
adder->wait_for_new_segment = FALSE;
adder->segment_start = 0;
adder->segment_end = GST_CLOCK_TIME_NONE;
adder->segment_rate = 1.0;

View file

@ -85,11 +85,12 @@ struct _GstAdder {
/* sink event handling */
GstPadEventFunction collect_event;
GstSegment segment;
gboolean segment_pending;
guint64 segment_start, segment_end;
gdouble segment_rate;
volatile gboolean new_segment_pending;
volatile gboolean wait_for_new_segment;
/* src event handling */
gboolean flush_stop_pending;
volatile gboolean flush_stop_pending;
/* target caps */
GstCaps *filter_caps;