From 463f48c1487384bdbc9e7315e2a29c58dd6a9908 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Wed, 13 Jul 2016 23:02:10 +1000 Subject: [PATCH] hlsdemux: add hlsdemux-specific AdaptiveDemuxStream subclass Prepare hlsdemux for more than one single stream. Currently hlsdemux assumes there'll only ever be one stream and most of the stream-specific state is actually in the hlsdemux structure. Add a stream subclass instead and move some stream-specific members there instead. --- ext/hls/gsthlsdemux.c | 135 ++++++++++++++++++++++++++---------------- ext/hls/gsthlsdemux.h | 26 +++++--- 2 files changed, 103 insertions(+), 58 deletions(-) diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index 14e8819c93..2146a39c1d 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -93,6 +93,7 @@ static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer); +static void gst_hls_demux_stream_free (GstAdaptiveDemuxStream * stream); static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream * @@ -114,7 +115,6 @@ gst_hls_demux_finalize (GObject * obj) GstHLSDemux *demux = GST_HLS_DEMUX (obj); gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux)); - g_object_unref (demux->pending_encrypted_data); gst_m3u8_client_free (demux->client); G_OBJECT_CLASS (parent_class)->finalize (obj); @@ -160,6 +160,7 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass) adaptivedemux_class->stream_update_fragment_info = gst_hls_demux_update_fragment_info; adaptivedemux_class->stream_select_bitrate = gst_hls_demux_select_bitrate; + adaptivedemux_class->stream_free = gst_hls_demux_stream_free; adaptivedemux_class->start_fragment = gst_hls_demux_start_fragment; adaptivedemux_class->finish_fragment = gst_hls_demux_finish_fragment; @@ -172,8 +173,8 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass) static void gst_hls_demux_init (GstHLSDemux * demux) { - demux->do_typefind = TRUE; - demux->pending_encrypted_data = gst_adapter_new (); + gst_adaptive_demux_set_stream_struct_size (GST_ADAPTIVE_DEMUX_CAST (demux), + sizeof (GstHLSDemuxStream)); } static GstStateChangeReturn @@ -232,11 +233,18 @@ gst_hls_demux_get_bitrate (GstHLSDemux * hlsdemux) static void gst_hls_demux_clear_pending_data (GstHLSDemux * hlsdemux) { + GstAdaptiveDemux *demux = (GstAdaptiveDemux *) hlsdemux; + GList *walk; + gst_hls_demux_decrypt_end (hlsdemux); - gst_adapter_clear (hlsdemux->pending_encrypted_data); - gst_buffer_replace (&hlsdemux->pending_decrypted_buffer, NULL); - gst_buffer_replace (&hlsdemux->pending_typefind_buffer, NULL); - hlsdemux->current_offset = -1; + for (walk = demux->streams; walk != NULL; walk = walk->next) { + GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (walk->data); + if (hls_stream->pending_encrypted_data) + gst_adapter_clear (hls_stream->pending_encrypted_data); + gst_buffer_replace (&hls_stream->pending_decrypted_buffer, NULL); + gst_buffer_replace (&hls_stream->pending_typefind_buffer, NULL); + hls_stream->current_offset = -1; + } } static gboolean @@ -284,7 +292,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) return FALSE; } //hlsdemux->discont = TRUE; - hlsdemux->do_typefind = TRUE; gst_hls_demux_change_playlist (hlsdemux, bitrate / ABS (rate), NULL); } else if (rate > -1.0 && rate <= 1.0 && (demux->segment.rate < -1.0 @@ -302,7 +309,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) return FALSE; } //hlsdemux->discont = TRUE; - hlsdemux->do_typefind = TRUE; /* TODO why not continue using the same? that was being used up to now? */ gst_hls_demux_change_playlist (hlsdemux, bitrate, NULL); } @@ -353,7 +359,8 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) } GST_DEBUG_OBJECT (demux, "seeking to sequence %u", (guint) current_sequence); - hlsdemux->reset_pts = TRUE; + for (walk = demux->streams; walk != NULL; walk = walk->next) + GST_HLS_DEMUX_STREAM_CAST (walk->data)->reset_pts = TRUE; hlsdemux->client->sequence = current_sequence; hlsdemux->client->current_file = current_file ? current_file : hlsdemux->client->current->files; @@ -390,12 +397,18 @@ static gboolean gst_hls_demux_setup_streams (GstAdaptiveDemux * demux) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); + GstAdaptiveDemuxStream *stream; + GstHLSDemuxStream *hlsdemux_stream; /* only 1 output supported */ gst_hls_demux_clear_pending_data (hlsdemux); - gst_adaptive_demux_stream_new (demux, gst_hls_demux_create_pad (hlsdemux)); + stream = gst_adaptive_demux_stream_new (demux, + gst_hls_demux_create_pad (hlsdemux)); - hlsdemux->reset_pts = TRUE; + hlsdemux_stream = GST_HLS_DEMUX_STREAM_CAST (stream); + + hlsdemux_stream->do_typefind = TRUE; + hlsdemux_stream->reset_pts = TRUE; return TRUE; } @@ -420,7 +433,9 @@ gst_hls_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf) if (playlist == NULL) { GST_WARNING_OBJECT (demux, "Error validating first playlist."); return FALSE; - } else if (!gst_m3u8_client_update (hlsdemux->client, playlist)) { + } + + if (!gst_m3u8_client_update (hlsdemux->client, playlist)) { /* In most cases, this will happen if we set a wrong url in the * source element and we have received the 404 HTML response instead of * the playlist */ @@ -535,22 +550,25 @@ key_failed: } } -/* Handles decrypted buffers only */ static GstFlowReturn gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer, gboolean force) { + GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); // FIXME: pass HlsStream into function GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); - if (G_UNLIKELY (hlsdemux->do_typefind && buffer != NULL)) { + if (buffer == NULL) + return GST_FLOW_OK; + + if (G_UNLIKELY (hls_stream->do_typefind)) { GstCaps *caps = NULL; GstMapInfo info; guint buffer_size; GstTypeFindProbability prob = GST_TYPE_FIND_NONE; - if (hlsdemux->pending_typefind_buffer) - buffer = gst_buffer_append (hlsdemux->pending_typefind_buffer, buffer); - hlsdemux->pending_typefind_buffer = NULL; + if (hls_stream->pending_typefind_buffer) + buffer = gst_buffer_append (hls_stream->pending_typefind_buffer, buffer); + hls_stream->pending_typefind_buffer = NULL; gst_buffer_map (buffer, &info, GST_MAP_READ); buffer_size = info.size; @@ -572,26 +590,26 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, ("Could not determine type of stream"), (NULL)); gst_buffer_unref (buffer); return GST_FLOW_NOT_NEGOTIATED; - } else { - hlsdemux->pending_typefind_buffer = buffer; - return GST_FLOW_OK; } + + hls_stream->pending_typefind_buffer = buffer; + + return GST_FLOW_OK; } GST_DEBUG_OBJECT (hlsdemux, "Typefind result: %" GST_PTR_FORMAT " prob:%d", caps, prob); gst_adaptive_demux_stream_set_caps (stream, caps); - hlsdemux->do_typefind = FALSE; + hls_stream->do_typefind = FALSE; } - - g_assert (hlsdemux->pending_typefind_buffer == NULL); + g_assert (hls_stream->pending_typefind_buffer == NULL); if (buffer) { buffer = gst_buffer_make_writable (buffer); - GST_BUFFER_OFFSET (buffer) = hlsdemux->current_offset; - hlsdemux->current_offset += gst_buffer_get_size (buffer); - GST_BUFFER_OFFSET_END (buffer) = hlsdemux->current_offset; + GST_BUFFER_OFFSET (buffer) = hls_stream->current_offset; + hls_stream->current_offset += gst_buffer_get_size (buffer); + GST_BUFFER_OFFSET_END (buffer) = hls_stream->current_offset; return gst_adaptive_demux_stream_push_buffer (stream, buffer); } return GST_FLOW_OK; @@ -602,31 +620,32 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); + GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); // FIXME: pass HlsStream into function GstFlowReturn ret = GST_FLOW_OK; if (hlsdemux->current_key) gst_hls_demux_decrypt_end (hlsdemux); if (stream->last_ret == GST_FLOW_OK) { - if (hlsdemux->pending_decrypted_buffer) { + if (hls_stream->pending_decrypted_buffer) { if (hlsdemux->current_key) { GstMapInfo info; gssize unpadded_size; /* Handle pkcs7 unpadding here */ - gst_buffer_map (hlsdemux->pending_decrypted_buffer, &info, + gst_buffer_map (hls_stream->pending_decrypted_buffer, &info, GST_MAP_READ); unpadded_size = info.size - info.data[info.size - 1]; - gst_buffer_unmap (hlsdemux->pending_decrypted_buffer, &info); + gst_buffer_unmap (hls_stream->pending_decrypted_buffer, &info); - gst_buffer_resize (hlsdemux->pending_decrypted_buffer, 0, + gst_buffer_resize (hls_stream->pending_decrypted_buffer, 0, unpadded_size); } ret = gst_hls_demux_handle_buffer (demux, stream, - hlsdemux->pending_decrypted_buffer, TRUE); - hlsdemux->pending_decrypted_buffer = NULL; + hls_stream->pending_decrypted_buffer, TRUE); + hls_stream->pending_decrypted_buffer = NULL; } } gst_hls_demux_clear_pending_data (hlsdemux); @@ -642,9 +661,10 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); + GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); - if (hlsdemux->current_offset == -1) - hlsdemux->current_offset = + if (hls_stream->current_offset == -1) + hls_stream->current_offset = GST_BUFFER_OFFSET_IS_VALID (buffer) ? GST_BUFFER_OFFSET (buffer) : 0; /* Is it encrypted? */ @@ -653,17 +673,20 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux, gsize size; GstBuffer *tmp_buffer; - gst_adapter_push (hlsdemux->pending_encrypted_data, buffer); - size = gst_adapter_available (hlsdemux->pending_encrypted_data); + if (hls_stream->pending_encrypted_data == NULL) + hls_stream->pending_encrypted_data = gst_adapter_new (); + + gst_adapter_push (hls_stream->pending_encrypted_data, buffer); + size = gst_adapter_available (hls_stream->pending_encrypted_data); /* must be a multiple of 16 */ - size = size & (~0xF); + size &= (~0xF); if (size == 0) { return GST_FLOW_OK; } - buffer = gst_adapter_take_buffer (hlsdemux->pending_encrypted_data, size); + buffer = gst_adapter_take_buffer (hls_stream->pending_encrypted_data, size); buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err); if (buffer == NULL) { GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"), @@ -672,14 +695,26 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux, return GST_FLOW_ERROR; } - tmp_buffer = hlsdemux->pending_decrypted_buffer; - hlsdemux->pending_decrypted_buffer = buffer; + tmp_buffer = hls_stream->pending_decrypted_buffer; + hls_stream->pending_decrypted_buffer = buffer; buffer = tmp_buffer; } return gst_hls_demux_handle_buffer (demux, stream, buffer, FALSE); } +static void +gst_hls_demux_stream_free (GstAdaptiveDemuxStream * stream) +{ + GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); + + if (hls_stream->pending_encrypted_data) + g_object_unref (hls_stream->pending_encrypted_data); + + gst_buffer_replace (&hls_stream->pending_decrypted_buffer, NULL); + gst_buffer_replace (&hls_stream->pending_typefind_buffer, NULL); +} + static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream) { @@ -692,17 +727,19 @@ gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream) static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream * stream) { + GstHLSDemuxStream *hlsdemux_stream = GST_HLS_DEMUX_STREAM_CAST (stream); GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (stream->demux); gst_m3u8_client_advance_fragment (hlsdemux->client, stream->demux->segment.rate > 0); - hlsdemux->reset_pts = FALSE; + hlsdemux_stream->reset_pts = FALSE; return GST_FLOW_OK; } static GstFlowReturn gst_hls_demux_update_fragment_info (GstAdaptiveDemuxStream * stream) { + GstHLSDemuxStream *hlsdemux_stream = GST_HLS_DEMUX_STREAM_CAST (stream); GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (stream->demux); gchar *next_fragment_uri; GstClockTime duration; @@ -723,7 +760,8 @@ gst_hls_demux_update_fragment_info (GstAdaptiveDemuxStream * stream) discont = TRUE; /* set up our source for download */ - if (hlsdemux->reset_pts || discont || stream->demux->segment.rate < 0.0) { + if (hlsdemux_stream->reset_pts || discont + || stream->demux->segment.rate < 0.0) { stream->fragment.timestamp = timestamp; } else { stream->fragment.timestamp = GST_CLOCK_TIME_NONE; @@ -775,9 +813,6 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux) { GstHLSDemux *demux = GST_HLS_DEMUX_CAST (ademux); - demux->do_typefind = TRUE; - demux->reset_pts = TRUE; - g_free (demux->key_url); demux->key_url = NULL; @@ -793,8 +828,9 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux) demux->client = gst_m3u8_client_new ("", NULL); demux->srcpad_counter = 0; + gst_hls_demux_clear_pending_data (demux); - gst_buffer_replace (&demux->pending_typefind_buffer, NULL); + if (demux->current_key) { g_free (demux->current_key); demux->current_key = NULL; @@ -1099,9 +1135,6 @@ retry_failover_protection: return gst_hls_demux_change_playlist (demux, new_bandwidth - 1, changed); } - /* Force typefinding since we might have changed media type */ - demux->do_typefind = TRUE; - return TRUE; } @@ -1137,7 +1170,7 @@ decrypt_fragment (GstHLSDemux * demux, gsize length, } static void -gst_hls_demux_decrypt_end (GstHLSDemux * demux) +gst_hls_stream_decrypt_end (GstHLSDemuxStream * stream) { EVP_CIPHER_CTX_cleanup (&demux->aes_ctx); } diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index f0c530ee2c..da02529a53 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -38,6 +38,7 @@ #endif G_BEGIN_DECLS + #define GST_TYPE_HLS_DEMUX \ (gst_hls_demux_get_type()) #define GST_HLS_DEMUX(obj) \ @@ -52,8 +53,26 @@ G_BEGIN_DECLS (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_HLS_DEMUX,GstHLSDemuxClass)) #define GST_HLS_DEMUX_CAST(obj) \ ((GstHLSDemux *)obj) + typedef struct _GstHLSDemux GstHLSDemux; typedef struct _GstHLSDemuxClass GstHLSDemuxClass; +typedef struct _GstHLSDemuxStream GstHLSDemuxStream; + +#define GST_HLS_DEMUX_STREAM_CAST(stream) ((GstHLSDemuxStream *)(stream)) + +struct _GstHLSDemuxStream +{ + GstAdaptiveDemux adaptive_demux_stream; + + gboolean do_typefind; /* Whether we need to typefind the next buffer */ + GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */ + + GstAdapter *pending_encrypted_data; /* for chunking data into 16 byte multiples for decryption */ + GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding. + We only know that it is the last at EOS */ + guint64 current_offset; /* offset we're currently at */ + gboolean reset_pts; +}; /** * GstHLSDemux: @@ -68,7 +87,6 @@ struct _GstHLSDemux gchar *uri; /* Original playlist URI */ GstM3U8Client *client; /* M3U8 client */ - gboolean do_typefind; /* Whether we need to typefind the next buffer */ /* Cache for the last key */ gchar *key_url; @@ -84,12 +102,6 @@ struct _GstHLSDemux #endif gchar *current_key; guint8 *current_iv; - GstAdapter *pending_encrypted_data; /* for chunking data into 16 byte multiples for decryption */ - GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding. - We only know that it is the last at EOS */ - GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */ - guint64 current_offset; /* offset we're currently at */ - gboolean reset_pts; }; struct _GstHLSDemuxClass