diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 58fec3230e..b751a6351d 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -200,7 +200,7 @@ static GstFlowReturn gst_dash_demux_stream_seek (GstAdaptiveDemuxStream * stream, GstClockTime ts); static GstFlowReturn gst_dash_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream); -static void +static gboolean gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream); static gboolean gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream, guint64 bitrate); @@ -213,8 +213,11 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream * stream); static void gst_dash_demux_advance_period (GstAdaptiveDemux * demux); static gboolean gst_dash_demux_has_next_period (GstAdaptiveDemux * demux); -static GstFlowReturn gst_dash_demux_chunk_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, GstBuffer ** chunk); +static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream); +static GstFlowReturn +gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream); /* GstDashDemux */ static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux); @@ -600,7 +603,8 @@ gst_dash_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf) if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) { klass = GST_ADAPTIVE_DEMUX_GET_CLASS (dashdemux); - klass->chunk_received = gst_dash_demux_chunk_received; + klass->data_received = gst_dash_demux_data_received; + klass->finish_fragment = gst_dash_demux_stream_fragment_finished; } if (gst_mpd_client_setup_media_presentation (dashdemux->client)) { @@ -882,7 +886,7 @@ gst_dash_demux_stream_seek (GstAdaptiveDemuxStream * stream, GstClockTime ts) return GST_FLOW_OK; } -static void +static gboolean gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream) { GstDashDemuxStream *dashstream = (GstDashDemuxStream *) stream; @@ -905,6 +909,7 @@ gst_dash_demux_stream_advance_subfragment (GstAdaptiveDemuxStream * stream) if (!fragment_finished) { dashstream->sidx_current_remaining = sidx->entries[sidx->entry_index].size; } + return !fragment_finished; } static GstFlowReturn @@ -914,17 +919,8 @@ gst_dash_demux_stream_advance_fragment (GstAdaptiveDemuxStream * stream) GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (stream->demux); if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) { - GstSidxBox *sidx = SIDX (dashstream); - - if (stream->demux->segment.rate > 0.0) { - if (sidx->entry_index < sidx->entries_count) { - return GST_FLOW_OK; - } - } else { - if (sidx->entry_index >= 0) { - return GST_FLOW_OK; - } - } + if (gst_dash_demux_stream_advance_subfragment (stream)) + return GST_FLOW_OK; } return gst_mpd_client_advance_segment (dashdemux->client, @@ -986,22 +982,14 @@ gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream, if (gst_mpd_client_has_isoff_ondemand_profile (demux->client)) { - /* a new subsegment is going to start, cleanup any pending data from the - * previous one */ + /* store our current position to change to the same one in a different + * representation if needed */ dashstream->sidx_index = SIDX (dashstream)->entry_index; - if (dashstream->pending_buffer) { - gst_buffer_unref (dashstream->pending_buffer); - dashstream->pending_buffer = NULL; - } - if (ret) { /* TODO cache indexes to avoid re-downloading and parsing */ /* if we switched, we need a new index */ gst_isoff_sidx_parser_clear (&dashstream->sidx_parser); gst_isoff_sidx_parser_init (&dashstream->sidx_parser); - } else { - dashstream->sidx_current_remaining = - SIDX_ENTRY (dashstream, dashstream->sidx_index)->size; } } @@ -1071,11 +1059,6 @@ gst_dash_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) gst_isoff_sidx_parser_clear (&dashstream->sidx_parser); gst_isoff_sidx_parser_init (&dashstream->sidx_parser); } - - if (dashstream->pending_buffer) { - gst_buffer_unref (dashstream->pending_buffer); - dashstream->pending_buffer = NULL; - } gst_dash_demux_stream_seek (iter->data, target_pos); } return TRUE; @@ -1247,88 +1230,112 @@ gst_dash_demux_advance_period (GstAdaptiveDemux * demux) } static GstBuffer * -_gst_buffer_split (GstBuffer ** buffer, gint offset, gsize size) +_gst_buffer_split (GstBuffer * buffer, gint offset, gsize size) { - GstBuffer *newbuf = gst_buffer_copy_region (*buffer, + GstBuffer *newbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS | GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY, offset, size - offset); - gst_buffer_resize (*buffer, 0, offset); + gst_buffer_resize (buffer, 0, offset); return newbuf; } static GstFlowReturn -gst_dash_demux_chunk_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, GstBuffer ** chunk) +gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) +{ + GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (demux); + + if (gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) { + /* fragment is advanced on data_received when byte limits are reached */ + return GST_FLOW_OK; + } else { + return gst_adaptive_demux_stream_advance_fragment (demux, stream, + stream->fragment.duration); + } +} + +static GstFlowReturn +gst_dash_demux_data_received (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) { GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream; GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer; + gsize available; - if (*chunk == NULL) { - if (dash_stream->pending_buffer) { - *chunk = dash_stream->pending_buffer; - dash_stream->pending_buffer = NULL; - } - return GST_FLOW_OK; - } - - if (dash_stream->pending_buffer) { - *chunk = gst_buffer_append (dash_stream->pending_buffer, *chunk); - dash_stream->pending_buffer = NULL; - } - - if (stream->downloading_index - && dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) { + if (stream->downloading_index) { GstIsoffParserResult res; guint consumed; - res = - gst_isoff_sidx_parser_add_buffer (&dash_stream->sidx_parser, *chunk, - &consumed); + available = gst_adapter_available (stream->adapter); + buffer = gst_adapter_take_buffer (stream->adapter, available); - if (res == GST_ISOFF_PARSER_ERROR) { - } else if (res == GST_ISOFF_PARSER_UNEXPECTED) { - /* this is not a 'sidx' index, just skip it and continue playback */ - } else { - /* when finished, prepare for real data streaming */ - if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) { - if (GST_CLOCK_TIME_IS_VALID (dash_stream->pending_seek_ts)) { - gst_dash_demux_stream_sidx_seek (dash_stream, - dash_stream->pending_seek_ts); - dash_stream->pending_seek_ts = GST_CLOCK_TIME_NONE; - } else { - SIDX (dash_stream)->entry_index = dash_stream->sidx_index; + if (dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) { + res = + gst_isoff_sidx_parser_add_buffer (&dash_stream->sidx_parser, buffer, + &consumed); + + if (res == GST_ISOFF_PARSER_ERROR) { + } else if (res == GST_ISOFF_PARSER_UNEXPECTED) { + /* this is not a 'sidx' index, just skip it and continue playback */ + } else { + /* when finished, prepare for real data streaming */ + if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) { + if (GST_CLOCK_TIME_IS_VALID (dash_stream->pending_seek_ts)) { + gst_dash_demux_stream_sidx_seek (dash_stream, + dash_stream->pending_seek_ts); + dash_stream->pending_seek_ts = GST_CLOCK_TIME_NONE; + } else { + SIDX (dash_stream)->entry_index = dash_stream->sidx_index; + } + dash_stream->sidx_current_remaining = + SIDX_CURRENT_ENTRY (dash_stream)->size; + } else if (consumed < available) { + GstBuffer *pending; + /* we still need to keep some data around for the next parsing round + * so just push what was already processed by the parser */ + pending = _gst_buffer_split (buffer, consumed, -1); + gst_adapter_push (stream->adapter, pending); } - dash_stream->sidx_current_remaining = - SIDX_CURRENT_ENTRY (dash_stream)->size; - } else if (consumed < gst_buffer_get_size (*chunk)) { - dash_stream->pending_buffer = _gst_buffer_split (chunk, consumed, -1); } } - } + ret = gst_adaptive_demux_stream_push_buffer (stream, buffer); + } else if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) { - /* check our position in subsegments */ - if (!stream->downloading_index - && dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) { - gsize size = gst_buffer_get_size (*chunk); + while (ret == GST_FLOW_OK + && ((available = gst_adapter_available (stream->adapter)) > 0)) { + gboolean advance = FALSE; - GST_LOG_OBJECT (stream->pad, - "Received buffer of size: %" G_GSIZE_FORMAT - " - remaining in subsegment: %" G_GSIZE_FORMAT, size, - dash_stream->sidx_current_remaining); - if (size < dash_stream->sidx_current_remaining) { - dash_stream->sidx_current_remaining -= size; - } else if (size >= dash_stream->sidx_current_remaining) { - if (size > dash_stream->sidx_current_remaining) { - dash_stream->pending_buffer = - _gst_buffer_split (chunk, dash_stream->sidx_current_remaining, - size); + if (available < dash_stream->sidx_current_remaining) { + buffer = gst_adapter_take_buffer (stream->adapter, available); + dash_stream->sidx_current_remaining -= available; + } else { + buffer = + gst_adapter_take_buffer (stream->adapter, + dash_stream->sidx_current_remaining); + dash_stream->sidx_current_remaining = 0; + advance = TRUE; } + ret = gst_adaptive_demux_stream_push_buffer (stream, buffer); + if (advance) { + GstFlowReturn new_ret; + new_ret = + gst_adaptive_demux_stream_advance_fragment (demux, stream, + SIDX_CURRENT_ENTRY (dash_stream)->duration); - gst_dash_demux_stream_advance_subfragment (stream); - ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END; + /* only overwrite if it was OK before */ + if (ret == GST_FLOW_OK) + ret = new_ret; + } } + } else { + /* this should be the main header, just push it all */ + ret = + gst_adaptive_demux_stream_push_buffer (stream, + gst_adapter_take_buffer (stream->adapter, + gst_adapter_available (stream->adapter))); } return ret; @@ -1340,6 +1347,4 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream) GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream; gst_isoff_sidx_parser_clear (&dash_stream->sidx_parser); - if (dash_stream->pending_buffer) - gst_buffer_unref (dash_stream->pending_buffer); } diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index fb1d81dc9b..7abd073ec2 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -65,8 +65,6 @@ struct _GstDashDemuxStream GstMediaFragmentInfo current_fragment; - GstBuffer *pending_buffer; - /* index parsing */ GstSidxParser sidx_parser; gsize sidx_current_remaining; diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index c4e589dba6..934bebabbb 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -111,10 +111,10 @@ static gboolean gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek); static gboolean gst_hls_demux_start_fragment (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); -static void gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, GstBuffer ** buffer); -static GstFlowReturn gst_hls_demux_chunk_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, GstBuffer ** chunk); +static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream); +static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream); static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream * @@ -216,7 +216,7 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass) adaptivedemux_class->start_fragment = gst_hls_demux_start_fragment; adaptivedemux_class->finish_fragment = gst_hls_demux_finish_fragment; - adaptivedemux_class->chunk_received = gst_hls_demux_chunk_received; + adaptivedemux_class->data_received = gst_hls_demux_data_received; GST_DEBUG_CATEGORY_INIT (gst_hls_demux_debug, "hlsdemux", 0, "hlsdemux element"); @@ -289,9 +289,6 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux)); gst_uri_downloader_reset (demux->downloader); break; - case GST_STATE_CHANGE_NULL_TO_READY: - demux->adapter = gst_adapter_new (); - break; default: break; } @@ -302,10 +299,6 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PAUSED_TO_READY: gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux)); break; - case GST_STATE_CHANGE_READY_TO_NULL: - gst_object_unref (demux->adapter); - demux->adapter = NULL; - break; default: break; } @@ -364,11 +357,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) /* properly cleanup pending decryption status */ if (flags & GST_SEEK_FLAG_FLUSH) { - if (hlsdemux->adapter) - gst_adapter_clear (hlsdemux->adapter); - if (hlsdemux->pending_buffer) - gst_buffer_unref (hlsdemux->pending_buffer); - hlsdemux->pending_buffer = NULL; gst_hls_demux_decrypt_end (hlsdemux); } @@ -593,9 +581,9 @@ key_failed: return FALSE; } -static void +static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, GstBuffer ** buffer) + GstAdaptiveDemuxStream * stream) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); @@ -605,8 +593,8 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, /* ideally this should be empty, but this eos might have been * caused by an error on the source element */ GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received" - ": %" G_GSIZE_FORMAT, gst_adapter_available (hlsdemux->adapter)); - gst_adapter_clear (hlsdemux->adapter); + ": %" G_GSIZE_FORMAT, gst_adapter_available (stream->adapter)); + gst_adapter_clear (stream->adapter); /* pending buffer is only used for encrypted streams */ if (stream->last_ret == GST_FLOW_OK) { @@ -621,40 +609,41 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, gst_buffer_resize (hlsdemux->pending_buffer, 0, unpadded_size); - *buffer = hlsdemux->pending_buffer; - hlsdemux->pending_buffer = NULL; + gst_adaptive_demux_stream_push_buffer (stream, hlsdemux->pending_buffer); } } else { if (hlsdemux->pending_buffer) gst_buffer_unref (hlsdemux->pending_buffer); hlsdemux->pending_buffer = NULL; } + + return gst_adaptive_demux_stream_advance_fragment (demux, stream, + stream->fragment.duration); } static GstFlowReturn -gst_hls_demux_chunk_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream, GstBuffer ** chunk) +gst_hls_demux_data_received (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); - GstBuffer *buffer = *chunk; + gsize available; + GstBuffer *buffer = NULL; + + available = gst_adapter_available (stream->adapter); /* Is it encrypted? */ if (hlsdemux->current_key) { GError *err = NULL; GstBuffer *tmp_buffer; - gsize available; - - gst_adapter_push (hlsdemux->adapter, buffer); - *chunk = NULL; /* must be a multiple of 16 */ - available = gst_adapter_available (hlsdemux->adapter) & (~0xF); + available = available & (~0xF); if (available == 0) { return GST_FLOW_OK; } - buffer = gst_adapter_take_buffer (hlsdemux->adapter, available); + buffer = gst_adapter_take_buffer (stream->adapter, available); buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err); if (buffer == NULL) { GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"), @@ -665,14 +654,15 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux, tmp_buffer = hlsdemux->pending_buffer; hlsdemux->pending_buffer = buffer; - *chunk = tmp_buffer; - } else if (hlsdemux->pending_buffer) { - *chunk = gst_buffer_append (hlsdemux->pending_buffer, buffer); - hlsdemux->pending_buffer = NULL; + buffer = tmp_buffer; + } else { + buffer = gst_adapter_take_buffer (stream->adapter, available); + if (hlsdemux->pending_buffer) { + buffer = gst_buffer_append (hlsdemux->pending_buffer, buffer); + hlsdemux->pending_buffer = NULL; + } } - buffer = *chunk; - if (G_UNLIKELY (hlsdemux->do_typefind && buffer != NULL)) { GstCaps *caps = NULL; GstMapInfo info; @@ -697,6 +687,7 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux, if (buffer_size > (2 * 1024 * 1024)) { GST_ELEMENT_ERROR (hlsdemux, STREAM, TYPE_NOT_FOUND, ("Could not determine type of stream"), (NULL)); + gst_buffer_unref (buffer); return GST_FLOW_NOT_NEGOTIATED; } else { if (hlsdemux->pending_buffer) @@ -704,7 +695,6 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux, gst_buffer_append (buffer, hlsdemux->pending_buffer); else hlsdemux->pending_buffer = buffer; - *chunk = NULL; return GST_FLOW_OK; } } @@ -722,6 +712,9 @@ gst_hls_demux_chunk_received (GstAdaptiveDemux * demux, hlsdemux->do_typefind = FALSE; } + if (buffer) { + return gst_adaptive_demux_stream_push_buffer (stream, buffer); + } return GST_FLOW_OK; } @@ -842,8 +835,6 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux) demux->client = gst_m3u8_client_new ("", NULL); demux->srcpad_counter = 0; - if (demux->adapter) - gst_adapter_clear (demux->adapter); if (demux->pending_buffer) gst_buffer_unref (demux->pending_buffer); demux->pending_buffer = NULL; diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index 20fe98fe0a..c76cd0e568 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -25,7 +25,6 @@ #define __GST_HLS_DEMUX_H__ #include -#include #include "m3u8.h" #include "gstfragmented.h" #include @@ -96,7 +95,6 @@ struct _GstHLSDemux #endif gchar *current_key; guint8 *current_iv; - GstAdapter *adapter; /* used to accumulate 16 bytes multiple chunks */ GstBuffer *pending_buffer; /* decryption scenario: * the last buffer can only be pushed when * resized, so need to store and wait for diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index 2141ede061..826d2e1e83 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -87,6 +87,11 @@ GST_DEBUG_CATEGORY (adaptivedemux_debug); #define MAX_DOWNLOAD_ERROR_COUNT 3 #define DEFAULT_FAILED_COUNT 3 +enum GstAdaptiveDemuxFlowReturn +{ + GST_ADAPTIVE_DEMUX_FLOW_SWITCH = GST_FLOW_CUSTOM_SUCCESS_2 + 1 +}; + struct _GstAdaptiveDemuxPrivate { GstAdapter *input_adapter; @@ -141,9 +146,6 @@ static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstClockTime ts); static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); -static GstFlowReturn -gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, guint64 bitrate); static GstFlowReturn @@ -171,6 +173,12 @@ static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux * static void gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream * stream, GstFlowReturn ret, GError * err); +static GstFlowReturn +gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream); +static GstFlowReturn +gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream); /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init @@ -223,6 +231,9 @@ gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass) gstelement_class->change_state = gst_adaptive_demux_change_state; gstbin_class->handle_message = gst_adaptive_demux_handle_message; + + klass->data_received = gst_adaptive_demux_stream_data_received_default; + klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default; } static void @@ -251,7 +262,6 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux, g_cond_init (&demux->priv->updates_timed_cond); g_cond_init (&demux->manifest_cond); - g_mutex_init (&demux->manifest_lock); pad_template = @@ -299,8 +309,6 @@ gst_adaptive_demux_change_state (GstElement * element, case GST_STATE_CHANGE_PAUSED_TO_READY: gst_adaptive_demux_reset (demux); break; - case GST_STATE_CHANGE_READY_TO_NULL: - break; default: break; } @@ -704,6 +712,7 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad) gst_segment_init (&stream->segment, GST_FORMAT_TIME); g_cond_init (&stream->fragment_download_cond); g_mutex_init (&stream->fragment_download_lock); + stream->adapter = gst_adapter_new (); demux->next_streams = g_list_append (demux->next_streams, stream); @@ -763,6 +772,9 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream) } if (stream->pending_caps) gst_caps_unref (stream->pending_caps); + + g_object_unref (stream->adapter); + g_free (stream); } @@ -1110,6 +1122,7 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux) gst_task_join (stream->download_task); stream->download_error_count = 0; stream->need_header = TRUE; + gst_adapter_clear (stream->adapter); } gst_task_join (demux->priv->updates_task); } @@ -1167,6 +1180,9 @@ gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemuxStream * if (bitrate > G_MAXINT) bitrate = G_MAXINT; stream->current_download_rate = bitrate; + GST_DEBUG_OBJECT (stream->pad, "Bitrate: %" G_GUINT64_FORMAT + " (bytes: %" G_GINT64_FORMAT " / %" G_GINT64_FORMAT " microsecs", + bitrate, stream->download_total_bytes, stream->download_total_time); return bitrate; } @@ -1202,54 +1218,13 @@ gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux) return GST_FLOW_OK; } -static GstFlowReturn -_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +GstFlowReturn +gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream, + GstBuffer * buffer) { - GstPad *srcpad = (GstPad *) parent; - GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (srcpad); GstAdaptiveDemux *demux = stream->demux; - GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux); GstFlowReturn ret = GST_FLOW_OK; gboolean discont = FALSE; - gboolean subsegment_end = FALSE; - - if (stream->starting_fragment) { - stream->starting_fragment = FALSE; - if (klass->start_fragment) { - klass->start_fragment (demux, stream); - } - - GST_BUFFER_PTS (buffer) = stream->fragment.timestamp; - - GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_PTS (buffer))); - - if (GST_BUFFER_PTS_IS_VALID (buffer)) - stream->segment.position = GST_BUFFER_PTS (buffer); - - } else { - GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE; - } - - /* The subclass might need to decrypt or modify the buffer somehow - * before processing it */ - if (klass->chunk_received) { - ret = klass->chunk_received (demux, stream, &buffer); - if (ret != GST_FLOW_OK) { - if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END) { - ret = GST_FLOW_OK; - subsegment_end = TRUE; - } else { - if (buffer) - gst_buffer_unref (buffer); - gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL); - return ret; - } - } - } - - if (buffer == NULL) - return ret; if (stream->first_fragment_buffer) { if (demux->segment.rate < 0) @@ -1280,15 +1255,6 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE; GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE; -/* TODO what about live? how to count segments? - GST_BUFFER_OFFSET (buffer) = - gst_mpd_client_get_segment_index (stream->active_stream) - 1; -*/ - - /* accumulate time and size to get this chunk */ - stream->download_total_time += - g_get_monotonic_time () - stream->download_start_time; - stream->download_total_bytes += gst_buffer_get_size (buffer); if (G_UNLIKELY (stream->pending_caps)) { GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT, @@ -1310,9 +1276,69 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) stream->pending_tags = NULL; } - ret = gst_proxy_pad_chain_default (pad, parent, buffer); - stream->download_start_time = g_get_monotonic_time (); - GST_LOG_OBJECT (pad, "Chain res: %d %s", ret, gst_flow_get_name (ret)); + ret = gst_pad_push (stream->pad, buffer); + GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret, + gst_flow_get_name (ret)); + + return ret; +} + +static GstFlowReturn +gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) +{ + return gst_adaptive_demux_stream_advance_fragment (demux, stream, + stream->fragment.duration); +} + +static GstFlowReturn +gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) +{ + GstBuffer *buffer; + + buffer = gst_adapter_take_buffer (stream->adapter, + gst_adapter_available (stream->adapter)); + return gst_adaptive_demux_stream_push_buffer (stream, buffer); +} + +static GstFlowReturn +_src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstPad *srcpad = (GstPad *) parent; + GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (srcpad); + GstAdaptiveDemux *demux = stream->demux; + GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux); + GstFlowReturn ret = GST_FLOW_OK; + + if (stream->starting_fragment) { + stream->starting_fragment = FALSE; + if (klass->start_fragment) { + klass->start_fragment (demux, stream); + } + + GST_BUFFER_PTS (buffer) = stream->fragment.timestamp; + + GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_PTS (buffer))); + + if (GST_BUFFER_PTS_IS_VALID (buffer)) + stream->segment.position = GST_BUFFER_PTS (buffer); + + } else { + GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE; + } + + stream->download_total_time += + g_get_monotonic_time () - stream->download_chunk_start_time; + stream->download_total_bytes += gst_buffer_get_size (buffer); + + gst_adapter_push (stream->adapter, buffer); + GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT + ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer), + gst_adapter_available (stream->adapter)); + ret = klass->data_received (demux, stream); + stream->download_chunk_start_time = g_get_monotonic_time (); if (ret != GST_FLOW_OK) { if (ret < GST_FLOW_EOS) { @@ -1327,34 +1353,13 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) } gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL); - } else if (subsegment_end) { - /* tell upstream that we are done here */ - ret = GST_FLOW_EOS; + if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) + ret = GST_FLOW_EOS; /* return EOS to make the source stop */ } return ret; } -static GstFlowReturn -gst_adaptive_demux_stream_fragment_eos (GstAdaptiveDemuxStream * stream) -{ - GstAdaptiveDemux *demux = stream->demux; - GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux); - GstBuffer *buffer = NULL; - GstFlowReturn ret = GST_FLOW_OK; - - if (klass->finish_fragment) { - klass->finish_fragment (demux, stream, &buffer); - if (buffer) { - stream->download_total_time += - g_get_monotonic_time () - stream->download_start_time; - stream->download_total_bytes += gst_buffer_get_size (buffer); - ret = gst_pad_push (stream->pad, buffer); - } - } - return ret; -} - static void gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream * stream, GstFlowReturn ret, GError * err) @@ -1386,9 +1391,11 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS:{ + GstAdaptiveDemuxClass *klass; GstFlowReturn ret; - ret = gst_adaptive_demux_stream_fragment_eos (stream); + klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux); + ret = klass->finish_fragment (stream->demux, stream); gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL); break; } @@ -1629,6 +1636,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, g_mutex_lock (&stream->fragment_download_lock); if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) { stream->download_start_time = g_get_monotonic_time (); + stream->download_chunk_start_time = g_get_monotonic_time (); g_mutex_unlock (&stream->fragment_download_lock); gst_element_sync_state_with_parent (stream->src); g_mutex_lock (&stream->fragment_download_lock); @@ -1734,20 +1742,7 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream) stream->fragment.range_start, stream->fragment.range_end); GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d %s", stream->last_ret, gst_flow_get_name (stream->last_ret)); - if (ret == GST_FLOW_OK) { - gst_element_post_message (GST_ELEMENT_CAST (demux), - gst_message_new_element (GST_OBJECT_CAST (demux), - gst_structure_new (STATISTICS_MESSAGE_NAME, - "manifest-uri", G_TYPE_STRING, - demux->manifest_uri, "uri", G_TYPE_STRING, - url, "fragment-start-time", - GST_TYPE_CLOCK_TIME, stream->download_start_time, - "fragment-stop-time", GST_TYPE_CLOCK_TIME, - gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64, - stream->download_total_bytes, "fragment-download-time", - GST_TYPE_CLOCK_TIME, - stream->download_total_time * GST_USECOND, NULL))); - } else { + if (ret != GST_FLOW_OK) { GST_INFO_OBJECT (demux, "No fragment downloaded"); /* TODO check if we are truly stoping */ if (ret != GST_FLOW_ERROR && gst_adaptive_demux_is_live (demux)) { @@ -1758,8 +1753,6 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream) } } - - return ret; no_url_error: @@ -1932,44 +1925,6 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream) GST_MANIFEST_LOCK (demux); } - if (ret == GST_FLOW_OK) { - stream->download_error_count = 0; - g_clear_error (&stream->last_error); - if (GST_CLOCK_TIME_IS_VALID (stream->fragment.duration)) - stream->segment.position += stream->fragment.duration; - - GST_DEBUG_OBJECT (stream->pad, "Advancing to next fragment"); - if (gst_adaptive_demux_stream_has_next_fragment (demux, stream)) { - ret = gst_adaptive_demux_stream_advance_fragment (demux, stream); - GST_DEBUG_OBJECT (stream->pad, "Advanced. Result: %d %s", ret, - gst_flow_get_name (ret)); - - if (ret == GST_FLOW_OK) { - if (gst_adaptive_demux_stream_select_bitrate (demux, stream, - gst_adaptive_demux_stream_update_current_bitrate (stream))) { - stream->need_header = TRUE; - } - - /* the subclass might want to switch pads */ - if (G_UNLIKELY (demux->next_streams)) { - gst_task_stop (stream->download_task); - /* TODO only allow switching streams if other downloads are not ongoing */ - GST_DEBUG_OBJECT (demux, "Subclass wants new pads " - "to do bitrate switching"); - gst_adaptive_demux_expose_streams (demux, FALSE); - gst_adaptive_demux_start_tasks (demux); - ret = GST_FLOW_EOS; - GST_MANIFEST_UNLOCK (demux); - goto end_of_manifest; - } - } - } else { - GST_DEBUG_OBJECT (stream->pad, "No next fragment -> EOS"); - ret = GST_FLOW_EOS; - } - } - - stream->last_ret = ret; switch (ret) { case GST_FLOW_OK: break; /* all is good, let's go */ @@ -2239,15 +2194,80 @@ gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux, return ret; } -static GstFlowReturn +GstFlowReturn gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstClockTime duration) +{ + GstFlowReturn ret; + + GST_MANIFEST_LOCK (demux); + if (stream->last_ret == GST_FLOW_OK) { + stream->last_ret = + gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream, + duration); + } + ret = stream->last_ret; + GST_MANIFEST_UNLOCK (demux); + + return ret; +} + +GstFlowReturn +gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream, GstClockTime duration) { GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux); + GstFlowReturn ret; g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR); - return klass->stream_advance_fragment (stream); + stream->download_error_count = 0; + g_clear_error (&stream->last_error); + stream->download_total_time += + g_get_monotonic_time () - stream->download_chunk_start_time; + + /* FIXME - url has no indication of byte ranges for subsegments */ + gst_element_post_message (GST_ELEMENT_CAST (demux), + gst_message_new_element (GST_OBJECT_CAST (demux), + gst_structure_new (STATISTICS_MESSAGE_NAME, + "manifest-uri", G_TYPE_STRING, + demux->manifest_uri, "uri", G_TYPE_STRING, + stream->fragment.uri, "fragment-start-time", + GST_TYPE_CLOCK_TIME, stream->download_start_time, + "fragment-stop-time", GST_TYPE_CLOCK_TIME, + gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64, + stream->download_total_bytes, "fragment-download-time", + GST_TYPE_CLOCK_TIME, + stream->download_total_time * GST_USECOND, NULL))); + + if (GST_CLOCK_TIME_IS_VALID (duration)) + stream->segment.position += duration; + ret = klass->stream_advance_fragment (stream); + + stream->download_start_time = stream->download_chunk_start_time = + g_get_monotonic_time (); + + if (ret == GST_FLOW_OK) { + if (gst_adaptive_demux_stream_select_bitrate (demux, stream, + gst_adaptive_demux_stream_update_current_bitrate (stream))) { + stream->need_header = TRUE; + gst_adapter_clear (stream->adapter); + ret = GST_ADAPTIVE_DEMUX_FLOW_SWITCH; + } + + /* the subclass might want to switch pads */ + if (G_UNLIKELY (demux->next_streams)) { + gst_task_stop (stream->download_task); + /* TODO only allow switching streams if other downloads are not ongoing */ + GST_DEBUG_OBJECT (demux, "Subclass wants new pads " + "to do bitrate switching"); + gst_adaptive_demux_expose_streams (demux, FALSE); + gst_adaptive_demux_start_tasks (demux); + ret = GST_FLOW_EOS; + } + } + + return ret; } static gboolean diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index d650fe47cc..456d093dc0 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -77,10 +77,6 @@ G_BEGIN_DECLS g_clear_error (&err); \ } G_STMT_END -enum _GstAdaptiveDemuxFlowReturn { - GST_ADAPTIVE_DEMUX_FLOW_SUBSEGMENT_END = GST_FLOW_CUSTOM_SUCCESS + 100 -}; - typedef struct _GstAdaptiveDemuxStreamFragment GstAdaptiveDemuxStreamFragment; typedef struct _GstAdaptiveDemuxStream GstAdaptiveDemuxStream; typedef struct _GstAdaptiveDemux GstAdaptiveDemux; @@ -115,6 +111,8 @@ struct _GstAdaptiveDemuxStream GstSegment segment; + GstAdapter *adapter; + GstCaps *pending_caps; GstEvent *pending_segment; GstTagList *pending_tags; @@ -141,6 +139,7 @@ struct _GstAdaptiveDemuxStream gboolean starting_fragment; gboolean first_fragment_buffer; gint64 download_start_time; + gint64 download_chunk_start_time; gint64 download_total_time; gint64 download_total_bytes; gint current_download_rate; @@ -335,28 +334,23 @@ struct _GstAdaptiveDemuxClass * finish_fragment: * @demux: #GstAdaptiveDemux * @stream: #GstAdaptiveDemuxStream - * @buffer: Pointer to store and pending data that should be pushed. * * Notifies the subclass that a fragment download was finished. - * It can be used to cleanup internal state after a fragment and also - * provides a pointer for the subclass to return some pending data - * that should be pushed before starting the next fragment. This - * covers the use case of finishing the decryption of the last chunk - * of an encrypted fragment. + * It can be used to cleanup internal state after a fragment and + * also push any pending data before moving to the next fragment. */ - void (*finish_fragment) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer ** buffer); + GstFlowReturn (*finish_fragment) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); /** - * chunk_received: + * data_received: * @demux: #GstAdaptiveDemux * @stream: #GstAdaptiveDemuxStream - * @buffer: Pointer containing the received chunk, also used to return modified data * - * Notifies the subclass that a fragment chunk was downloaded. The subclass can - * modify the buffer and return a new one if needed. Used for decryption. + * Notifies the subclass that a fragment chunk was downloaded. The subclass + * can look at the data at the adapter and modify/push data as desired. * * Returns: #GST_FLOW_OK if successful, #GST_FLOW_ERROR in case of error. */ - GstFlowReturn (*chunk_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer ** buffer); + GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); /** * get_live_seek_range: @@ -385,6 +379,14 @@ void gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream, GstTagList * tags); void gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f); +GstFlowReturn gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream, GstBuffer * buffer); +GstFlowReturn +gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream, GstClockTime duration); +GstFlowReturn +gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream, GstClockTime duration); + G_END_DECLS #endif