splitmuxsrc: Implement state change asynchronously instead of blocking

Blocking in change_state() is a recipe for disaster, even more so if
we wait for another thread that also calls into various element API and
could then lead to deadlocks on e.g. the state lock.
This commit is contained in:
Sebastian Dröge 2019-01-09 11:42:36 +02:00
parent 8b155d7188
commit 99bb6f44ba
4 changed files with 298 additions and 100 deletions

View file

@ -88,6 +88,10 @@ static void type_found (GstElement * typefind, guint probability,
GstCaps * caps, GstSplitMuxPartReader * reader);
static void check_if_pads_collected (GstSplitMuxPartReader * reader);
static void
gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
reader);
/* Called with reader lock held */
static gboolean
have_empty_queue (GstSplitMuxPartReader * reader)
@ -415,7 +419,10 @@ splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
GST_LOG_OBJECT (reader,
"EOS while measuring streams. Resetting for ready");
reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
SPLITMUX_PART_BROADCAST (reader);
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc)
gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
}
goto drop_event;
}
@ -689,6 +696,37 @@ splitmux_part_reader_finalize (GObject * object)
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
do_async_start (GstSplitMuxPartReader * reader)
{
GstMessage *message;
GST_STATE_LOCK (reader);
reader->async_pending = TRUE;
message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
GST_STATE_UNLOCK (reader);
}
static void
do_async_done (GstSplitMuxPartReader * reader)
{
GstMessage *message;
GST_STATE_LOCK (reader);
if (reader->async_pending) {
message =
gst_message_new_async_done (GST_OBJECT_CAST (reader),
GST_CLOCK_TIME_NONE);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
message);
reader->async_pending = FALSE;
}
GST_STATE_UNLOCK (reader);
}
static void
splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
{
@ -850,6 +888,7 @@ gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
static void
gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
{
SPLITMUX_PART_LOCK (reader);
/* Trigger a flushing seek to near the end of the file and run each stream
* to EOS in order to find the smallest end timestamp to start the next
* file from
@ -859,18 +898,25 @@ gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
}
SPLITMUX_PART_UNLOCK (reader);
}
/* Wait for things to happen */
while (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS)
SPLITMUX_PART_WAIT (reader);
static void
gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
reader)
{
SPLITMUX_PART_LOCK (reader);
if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
/* Fire the prepared signal and go to READY state */
GST_DEBUG_OBJECT (reader,
"Stream measuring complete. File %s is now ready. Firing prepared signal",
reader->path);
reader->prep_state = PART_STATE_READY;
SPLITMUX_PART_UNLOCK (reader);
g_signal_emit (reader, part_reader_signals[SIGNAL_PREPARED], 0, NULL);
do_async_done (reader);
} else {
SPLITMUX_PART_UNLOCK (reader);
}
}
@ -939,7 +985,9 @@ check_if_pads_collected (GstSplitMuxPartReader * reader)
GST_DEBUG_OBJECT (reader,
"no more pads - file %s. Measuring stream length", reader->path);
reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
SPLITMUX_PART_BROADCAST (reader);
gst_element_call_async (GST_ELEMENT_CAST (reader),
(GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
NULL, NULL);
}
}
}
@ -1041,16 +1089,16 @@ gst_splitmux_part_reader_change_state (GstElement * element,
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
/* Hold the splitmux type lock until after the
* parent state change function has finished
* changing the states of things, and type finding can continue */
SPLITMUX_PART_LOCK (reader);
g_object_set (reader->src, "location", reader->path, NULL);
reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
reader->running = TRUE;
SPLITMUX_PART_UNLOCK (reader);
SPLITMUX_PART_TYPE_LOCK (reader);
/* we go to PAUSED asynchronously once all streams have been collected
* and seeks to measure the stream lengths are done */
do_async_start (reader);
break;
}
case GST_STATE_CHANGE_READY_TO_NULL:
@ -1074,33 +1122,16 @@ gst_splitmux_part_reader_change_state (GstElement * element,
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE) {
if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) {
/* Make sure to release the lock we took above */
SPLITMUX_PART_TYPE_UNLOCK (reader);
}
do_async_done (reader);
goto beach;
}
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* Sleep and wait until all streams have been collected, then do the seeks
* to measure the stream lengths. This took the type lock above,
* but it's OK to release it now and let typefinding happen... */
SPLITMUX_PART_TYPE_UNLOCK (reader);
SPLITMUX_PART_LOCK (reader);
while (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
GST_LOG_OBJECT (reader, "Waiting to collect all output streams");
SPLITMUX_PART_WAIT (reader);
}
if (reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS ||
reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
gst_splitmux_part_reader_measure_streams (reader);
} else if (reader->prep_state == PART_STATE_FAILED)
ret = GST_STATE_CHANGE_FAILURE;
SPLITMUX_PART_UNLOCK (reader);
ret = GST_STATE_CHANGE_ASYNC;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
do_async_done (reader);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
SPLITMUX_PART_LOCK (reader);
@ -1121,28 +1152,6 @@ beach:
return ret;
}
static gboolean
check_bus_messages (GstSplitMuxPartReader * part)
{
gboolean ret = FALSE;
GstBus *bus;
GstMessage *m;
bus = gst_element_get_bus (GST_ELEMENT_CAST (part));
while ((m = gst_bus_pop (bus)) != NULL) {
if (GST_MESSAGE_TYPE (m) == GST_MESSAGE_ERROR) {
GST_LOG_OBJECT (part, "Got error message while preparing. Failing.");
gst_message_unref (m);
goto done;
}
gst_message_unref (m);
}
ret = TRUE;
done:
gst_object_unref (bus);
return ret;
}
gboolean
gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
{
@ -1150,10 +1159,10 @@ gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
if (ret != GST_STATE_CHANGE_SUCCESS)
if (ret == GST_STATE_CHANGE_FAILURE)
return FALSE;
return check_bus_messages (part);
return TRUE;
}
void
@ -1361,6 +1370,7 @@ bus_handler (GstBin * bin, GstMessage * message)
reader->prep_state = PART_STATE_FAILED;
SPLITMUX_PART_BROADCAST (reader);
SPLITMUX_PART_UNLOCK (reader);
do_async_done (reader);
break;
default:
break;

View file

@ -64,6 +64,7 @@ struct _GstSplitMuxPartReader
GstElement *typefind;
GstElement *demux;
gboolean async_pending;
gboolean active;
gboolean running;
gboolean prepared;

View file

@ -114,6 +114,9 @@ static gboolean gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux,
SplitMuxSrcPad * pad);
static gboolean gst_splitmux_check_new_caps (SplitMuxSrcPad * splitpad,
GstEvent * event);
static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
guint part, GstSeekFlags extra_flags);
#define _do_init \
G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init);
@ -315,6 +318,38 @@ gst_splitmux_src_get_property (GObject * object, guint prop_id,
}
}
static void
do_async_start (GstSplitMuxSrc * splitmux)
{
GstMessage *message;
GST_STATE_LOCK (splitmux);
splitmux->async_pending = TRUE;
message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux),
message);
GST_STATE_UNLOCK (splitmux);
}
static void
do_async_done (GstSplitMuxSrc * splitmux)
{
GstMessage *message;
GST_STATE_LOCK (splitmux);
if (splitmux->async_pending) {
message =
gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
GST_CLOCK_TIME_NONE);
GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (splitmux),
message);
splitmux->async_pending = FALSE;
}
GST_STATE_UNLOCK (splitmux);
}
static GstStateChangeReturn
gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
{
@ -326,8 +361,12 @@ gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
if (!gst_splitmux_src_start (splitmux))
do_async_start (splitmux);
if (!gst_splitmux_src_start (splitmux)) {
do_async_done (splitmux);
return GST_STATE_CHANGE_FAILURE;
}
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
@ -341,14 +380,150 @@ gst_splitmux_src_change_state (GstElement * element, GstStateChange transition)
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (ret == GST_STATE_CHANGE_FAILURE) {
do_async_done (splitmux);
return ret;
}
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
ret = GST_STATE_CHANGE_ASYNC;
break;
default:
break;
}
return ret;
}
static gboolean gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux);
static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
guint part, GstSeekFlags extra_flags);
static void
gst_splitmux_src_activate_first_part (GstSplitMuxSrc * splitmux)
{
if (!gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE)) {
GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
("Failed to activate first part for playback"));
}
}
static GstBusSyncReply
gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
gpointer user_data)
{
GstSplitMuxSrc *splitmux = user_data;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_ASYNC_DONE:{
guint idx = splitmux->num_prepared_parts;
if (idx >= splitmux->num_parts) {
/* Shouldn't really happen! */
do_async_done (splitmux);
g_warn_if_reached ();
break;
}
GST_DEBUG_OBJECT (splitmux, "Prepared file part %s (%u)",
splitmux->parts[idx]->path, idx);
/* Extend our total duration to cover this part */
GST_OBJECT_LOCK (splitmux);
splitmux->total_duration +=
gst_splitmux_part_reader_get_duration (splitmux->parts[idx]);
splitmux->play_segment.duration = splitmux->total_duration;
GST_OBJECT_UNLOCK (splitmux);
splitmux->end_offset =
gst_splitmux_part_reader_get_end_offset (splitmux->parts[idx]);
GST_DEBUG_OBJECT (splitmux,
"Duration %" GST_TIME_FORMAT ", total duration now: %" GST_TIME_FORMAT
" and end offset %" GST_TIME_FORMAT,
gst_splitmux_part_reader_get_duration (splitmux->parts[idx]),
splitmux->total_duration, splitmux->end_offset);
splitmux->num_prepared_parts++;
/* If we're done or preparing the next part fails, finish here */
if (splitmux->num_prepared_parts >= splitmux->num_parts
|| !gst_splitmux_src_prepare_next_part (splitmux)) {
/* Store how many parts we actually prepared in the end */
splitmux->num_parts = splitmux->num_prepared_parts;
do_async_done (splitmux);
/* All done preparing, activate the first part */
GST_INFO_OBJECT (splitmux,
"All parts prepared. Total duration %" GST_TIME_FORMAT
" Activating first part", GST_TIME_ARGS (splitmux->total_duration));
gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
NULL, NULL);
}
break;
}
case GST_MESSAGE_ERROR:{
GST_ERROR_OBJECT (splitmux,
"Got error message from part %" GST_PTR_FORMAT ": %" GST_PTR_FORMAT,
GST_MESSAGE_SRC (msg), msg);
if (splitmux->num_prepared_parts < splitmux->num_parts) {
guint idx = splitmux->num_prepared_parts;
if (idx == 0) {
GST_ERROR_OBJECT (splitmux,
"Failed to prepare first file part %s for playback",
splitmux->parts[idx]->path);
GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
("Failed to prepare first file part %s for playback",
splitmux->parts[idx]->path));
} else {
GST_WARNING_OBJECT (splitmux,
"Failed to prepare file part %s. Cannot play past there.",
splitmux->parts[idx]->path);
GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
("Failed to prepare file part %s. Cannot play past there.",
splitmux->parts[idx]->path));
}
/* Store how many parts we actually prepared in the end */
splitmux->num_parts = splitmux->num_prepared_parts;
do_async_done (splitmux);
if (idx > 0) {
/* All done preparing, activate the first part */
GST_INFO_OBJECT (splitmux,
"All parts prepared. Total duration %" GST_TIME_FORMAT
" Activating first part",
GST_TIME_ARGS (splitmux->total_duration));
gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
NULL, NULL);
}
} else {
/* Need to update the message source so that it's part of the element
* hierarchy the application would expect */
msg = gst_message_copy (msg);
gst_object_replace ((GstObject **) & msg->src, (GstObject *) splitmux);
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
}
break;
}
default:
break;
}
return GST_BUS_PASS;
}
static GstSplitMuxPartReader *
gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename)
{
GstSplitMuxPartReader *r;
GstBus *bus;
r = g_object_new (GST_TYPE_SPLITMUX_PART_READER, NULL);
@ -359,6 +534,10 @@ gst_splitmux_part_create (GstSplitMuxSrc * splitmux, char *filename)
(GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad);
gst_splitmux_part_reader_set_location (r, filename);
bus = gst_element_get_bus (GST_ELEMENT_CAST (r));
gst_bus_set_sync_handler (bus, gst_splitmux_part_bus_handler, splitmux, NULL);
gst_object_unref (bus);
return r;
}
@ -653,6 +832,34 @@ gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
return TRUE;
}
static gboolean
gst_splitmux_src_prepare_next_part (GstSplitMuxSrc * splitmux)
{
guint idx = splitmux->num_prepared_parts;
g_assert (idx < splitmux->num_parts);
GST_DEBUG_OBJECT (splitmux, "Preparing file part %s (%u)",
splitmux->parts[idx]->path, idx);
gst_splitmux_part_reader_set_start_offset (splitmux->parts[idx],
splitmux->end_offset);
if (!gst_splitmux_part_reader_prepare (splitmux->parts[idx])) {
GST_WARNING_OBJECT (splitmux,
"Failed to prepare file part %s. Cannot play past there.",
splitmux->parts[idx]->path);
GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
("Failed to prepare file part %s. Cannot play past there.",
splitmux->parts[idx]->path));
gst_splitmux_part_reader_unprepare (splitmux->parts[idx]);
g_object_unref (splitmux->parts[idx]);
splitmux->parts[idx] = NULL;
return FALSE;
}
return TRUE;
}
static gboolean
gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
{
@ -661,9 +868,7 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
gchar *basename = NULL;
gchar *dirname = NULL;
gchar **files;
GstClockTime next_offset = 0;
guint i;
GstClockTime total_duration = 0;
GST_DEBUG_OBJECT (splitmux, "Starting");
@ -693,52 +898,33 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
splitmux->parts = g_new0 (GstSplitMuxPartReader *, splitmux->num_parts);
/* Create all part pipelines */
for (i = 0; i < splitmux->num_parts; i++) {
splitmux->parts[i] = gst_splitmux_part_create (splitmux, files[i]);
if (splitmux->parts[i] == NULL)
break;
/* Figure out the next offset - the smallest one */
gst_splitmux_part_reader_set_start_offset (splitmux->parts[i], next_offset);
if (!gst_splitmux_part_reader_prepare (splitmux->parts[i])) {
GST_WARNING_OBJECT (splitmux,
"Failed to prepare file part %s. Cannot play past there.", files[i]);
GST_ELEMENT_WARNING (splitmux, RESOURCE, READ, (NULL),
("Failed to prepare file part %s. Cannot play past there.",
files[i]));
gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
g_object_unref (splitmux->parts[i]);
splitmux->parts[i] = NULL;
break;
}
/* Extend our total duration to cover this part */
total_duration =
next_offset +
gst_splitmux_part_reader_get_duration (splitmux->parts[i]);
splitmux->play_segment.duration = total_duration;
next_offset = gst_splitmux_part_reader_get_end_offset (splitmux->parts[i]);
}
/* Store how many parts we actually created */
splitmux->num_created_parts = splitmux->num_parts = i;
splitmux->num_prepared_parts = 0;
/* Update total_duration state variable */
GST_OBJECT_LOCK (splitmux);
splitmux->total_duration = total_duration;
splitmux->total_duration = 0;
splitmux->end_offset = 0;
GST_OBJECT_UNLOCK (splitmux);
/* Store how many parts we actually created */
splitmux->num_parts = i;
if (splitmux->num_parts < 1)
/* Then start the first: it will asynchronously go to PAUSED
* or error out and then we can proceed with the next one
*/
if (!gst_splitmux_src_prepare_next_part (splitmux) || splitmux->num_parts < 1)
goto failed_part;
/* All done preparing, activate the first part */
GST_INFO_OBJECT (splitmux,
"All parts prepared. Total duration %" GST_TIME_FORMAT
" Activating first part", GST_TIME_ARGS (total_duration));
ret = gst_splitmux_src_activate_part (splitmux, 0, GST_SEEK_FLAG_NONE);
if (ret == FALSE)
goto failed_first_part;
/* All good now: we have to wait for all parts to be asynchronously
* prepared to know the total duration we can play */
ret = TRUE;
done:
if (err != NULL)
g_error_free (err);
@ -762,12 +948,6 @@ failed_part:
("Failed to open any files for reading"));
goto done;
}
failed_first_part:
{
GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
("Failed to activate first part for playback"));
goto done;
}
}
static gboolean
@ -784,7 +964,7 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
GST_DEBUG_OBJECT (splitmux, "Stopping");
/* Stop and destroy all parts */
for (i = 0; i < splitmux->num_parts; i++) {
for (i = 0; i < splitmux->num_created_parts; i++) {
if (splitmux->parts[i] == NULL)
continue;
gst_splitmux_part_reader_unprepare (splitmux->parts[i]);
@ -809,6 +989,8 @@ gst_splitmux_src_stop (GstSplitMuxSrc * splitmux)
g_free (splitmux->parts);
splitmux->parts = NULL;
splitmux->num_parts = 0;
splitmux->num_prepared_parts = 0;
splitmux->num_created_parts = 0;
splitmux->running = FALSE;
splitmux->total_duration = GST_CLOCK_TIME_NONE;
/* Reset playback segment */

View file

@ -50,15 +50,20 @@ struct _GstSplitMuxSrc
GstSplitMuxPartReader **parts;
guint num_parts;
guint num_prepared_parts;
guint num_created_parts;
guint cur_part;
gboolean async_pending;
gboolean pads_complete;
GMutex pads_lock;
GList *pads; /* pads_lock */
guint n_pads;
guint n_notlinked;
GstClockTime total_duration;
GstClockTime end_offset;
GstSegment play_segment;
guint32 segment_seqnum;
};