hlsdemux: add hlsdemux-specific AdaptiveDemuxStream subclass

Prepare hlsdemux for more than one single stream. Currently hlsdemux
assumes there'll only ever be one stream and most of the stream-specific
state is actually in the hlsdemux structure. Add a stream subclass
instead and move some stream-specific members there instead.
This commit is contained in:
Jan Schmidt 2016-07-13 23:02:10 +10:00
parent 585e60c4ab
commit 463f48c148
2 changed files with 103 additions and 58 deletions

View file

@ -93,6 +93,7 @@ static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream);
static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
static void gst_hls_demux_stream_free (GstAdaptiveDemuxStream * stream);
static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream *
stream);
static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream *
@ -114,7 +115,6 @@ gst_hls_demux_finalize (GObject * obj)
GstHLSDemux *demux = GST_HLS_DEMUX (obj);
gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux));
g_object_unref (demux->pending_encrypted_data);
gst_m3u8_client_free (demux->client);
G_OBJECT_CLASS (parent_class)->finalize (obj);
@ -160,6 +160,7 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass)
adaptivedemux_class->stream_update_fragment_info =
gst_hls_demux_update_fragment_info;
adaptivedemux_class->stream_select_bitrate = gst_hls_demux_select_bitrate;
adaptivedemux_class->stream_free = gst_hls_demux_stream_free;
adaptivedemux_class->start_fragment = gst_hls_demux_start_fragment;
adaptivedemux_class->finish_fragment = gst_hls_demux_finish_fragment;
@ -172,8 +173,8 @@ gst_hls_demux_class_init (GstHLSDemuxClass * klass)
static void
gst_hls_demux_init (GstHLSDemux * demux)
{
demux->do_typefind = TRUE;
demux->pending_encrypted_data = gst_adapter_new ();
gst_adaptive_demux_set_stream_struct_size (GST_ADAPTIVE_DEMUX_CAST (demux),
sizeof (GstHLSDemuxStream));
}
static GstStateChangeReturn
@ -232,11 +233,18 @@ gst_hls_demux_get_bitrate (GstHLSDemux * hlsdemux)
static void
gst_hls_demux_clear_pending_data (GstHLSDemux * hlsdemux)
{
GstAdaptiveDemux *demux = (GstAdaptiveDemux *) hlsdemux;
GList *walk;
gst_hls_demux_decrypt_end (hlsdemux);
gst_adapter_clear (hlsdemux->pending_encrypted_data);
gst_buffer_replace (&hlsdemux->pending_decrypted_buffer, NULL);
gst_buffer_replace (&hlsdemux->pending_typefind_buffer, NULL);
hlsdemux->current_offset = -1;
for (walk = demux->streams; walk != NULL; walk = walk->next) {
GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (walk->data);
if (hls_stream->pending_encrypted_data)
gst_adapter_clear (hls_stream->pending_encrypted_data);
gst_buffer_replace (&hls_stream->pending_decrypted_buffer, NULL);
gst_buffer_replace (&hls_stream->pending_typefind_buffer, NULL);
hls_stream->current_offset = -1;
}
}
static gboolean
@ -284,7 +292,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
return FALSE;
}
//hlsdemux->discont = TRUE;
hlsdemux->do_typefind = TRUE;
gst_hls_demux_change_playlist (hlsdemux, bitrate / ABS (rate), NULL);
} else if (rate > -1.0 && rate <= 1.0 && (demux->segment.rate < -1.0
@ -302,7 +309,6 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
return FALSE;
}
//hlsdemux->discont = TRUE;
hlsdemux->do_typefind = TRUE;
/* TODO why not continue using the same? that was being used up to now? */
gst_hls_demux_change_playlist (hlsdemux, bitrate, NULL);
}
@ -353,7 +359,8 @@ gst_hls_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek)
}
GST_DEBUG_OBJECT (demux, "seeking to sequence %u", (guint) current_sequence);
hlsdemux->reset_pts = TRUE;
for (walk = demux->streams; walk != NULL; walk = walk->next)
GST_HLS_DEMUX_STREAM_CAST (walk->data)->reset_pts = TRUE;
hlsdemux->client->sequence = current_sequence;
hlsdemux->client->current_file =
current_file ? current_file : hlsdemux->client->current->files;
@ -390,12 +397,18 @@ static gboolean
gst_hls_demux_setup_streams (GstAdaptiveDemux * demux)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
GstAdaptiveDemuxStream *stream;
GstHLSDemuxStream *hlsdemux_stream;
/* only 1 output supported */
gst_hls_demux_clear_pending_data (hlsdemux);
gst_adaptive_demux_stream_new (demux, gst_hls_demux_create_pad (hlsdemux));
stream = gst_adaptive_demux_stream_new (demux,
gst_hls_demux_create_pad (hlsdemux));
hlsdemux->reset_pts = TRUE;
hlsdemux_stream = GST_HLS_DEMUX_STREAM_CAST (stream);
hlsdemux_stream->do_typefind = TRUE;
hlsdemux_stream->reset_pts = TRUE;
return TRUE;
}
@ -420,7 +433,9 @@ gst_hls_demux_process_manifest (GstAdaptiveDemux * demux, GstBuffer * buf)
if (playlist == NULL) {
GST_WARNING_OBJECT (demux, "Error validating first playlist.");
return FALSE;
} else if (!gst_m3u8_client_update (hlsdemux->client, playlist)) {
}
if (!gst_m3u8_client_update (hlsdemux->client, playlist)) {
/* In most cases, this will happen if we set a wrong url in the
* source element and we have received the 404 HTML response instead of
* the playlist */
@ -535,22 +550,25 @@ key_failed:
}
}
/* Handles decrypted buffers only */
static GstFlowReturn
gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer * buffer, gboolean force)
{
GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); // FIXME: pass HlsStream into function
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
if (G_UNLIKELY (hlsdemux->do_typefind && buffer != NULL)) {
if (buffer == NULL)
return GST_FLOW_OK;
if (G_UNLIKELY (hls_stream->do_typefind)) {
GstCaps *caps = NULL;
GstMapInfo info;
guint buffer_size;
GstTypeFindProbability prob = GST_TYPE_FIND_NONE;
if (hlsdemux->pending_typefind_buffer)
buffer = gst_buffer_append (hlsdemux->pending_typefind_buffer, buffer);
hlsdemux->pending_typefind_buffer = NULL;
if (hls_stream->pending_typefind_buffer)
buffer = gst_buffer_append (hls_stream->pending_typefind_buffer, buffer);
hls_stream->pending_typefind_buffer = NULL;
gst_buffer_map (buffer, &info, GST_MAP_READ);
buffer_size = info.size;
@ -572,26 +590,26 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux,
("Could not determine type of stream"), (NULL));
gst_buffer_unref (buffer);
return GST_FLOW_NOT_NEGOTIATED;
} else {
hlsdemux->pending_typefind_buffer = buffer;
return GST_FLOW_OK;
}
hls_stream->pending_typefind_buffer = buffer;
return GST_FLOW_OK;
}
GST_DEBUG_OBJECT (hlsdemux, "Typefind result: %" GST_PTR_FORMAT " prob:%d",
caps, prob);
gst_adaptive_demux_stream_set_caps (stream, caps);
hlsdemux->do_typefind = FALSE;
hls_stream->do_typefind = FALSE;
}
g_assert (hlsdemux->pending_typefind_buffer == NULL);
g_assert (hls_stream->pending_typefind_buffer == NULL);
if (buffer) {
buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_OFFSET (buffer) = hlsdemux->current_offset;
hlsdemux->current_offset += gst_buffer_get_size (buffer);
GST_BUFFER_OFFSET_END (buffer) = hlsdemux->current_offset;
GST_BUFFER_OFFSET (buffer) = hls_stream->current_offset;
hls_stream->current_offset += gst_buffer_get_size (buffer);
GST_BUFFER_OFFSET_END (buffer) = hls_stream->current_offset;
return gst_adaptive_demux_stream_push_buffer (stream, buffer);
}
return GST_FLOW_OK;
@ -602,31 +620,32 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); // FIXME: pass HlsStream into function
GstFlowReturn ret = GST_FLOW_OK;
if (hlsdemux->current_key)
gst_hls_demux_decrypt_end (hlsdemux);
if (stream->last_ret == GST_FLOW_OK) {
if (hlsdemux->pending_decrypted_buffer) {
if (hls_stream->pending_decrypted_buffer) {
if (hlsdemux->current_key) {
GstMapInfo info;
gssize unpadded_size;
/* Handle pkcs7 unpadding here */
gst_buffer_map (hlsdemux->pending_decrypted_buffer, &info,
gst_buffer_map (hls_stream->pending_decrypted_buffer, &info,
GST_MAP_READ);
unpadded_size = info.size - info.data[info.size - 1];
gst_buffer_unmap (hlsdemux->pending_decrypted_buffer, &info);
gst_buffer_unmap (hls_stream->pending_decrypted_buffer, &info);
gst_buffer_resize (hlsdemux->pending_decrypted_buffer, 0,
gst_buffer_resize (hls_stream->pending_decrypted_buffer, 0,
unpadded_size);
}
ret =
gst_hls_demux_handle_buffer (demux, stream,
hlsdemux->pending_decrypted_buffer, TRUE);
hlsdemux->pending_decrypted_buffer = NULL;
hls_stream->pending_decrypted_buffer, TRUE);
hls_stream->pending_decrypted_buffer = NULL;
}
}
gst_hls_demux_clear_pending_data (hlsdemux);
@ -642,9 +661,10 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux,
GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
{
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux);
GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream);
if (hlsdemux->current_offset == -1)
hlsdemux->current_offset =
if (hls_stream->current_offset == -1)
hls_stream->current_offset =
GST_BUFFER_OFFSET_IS_VALID (buffer) ? GST_BUFFER_OFFSET (buffer) : 0;
/* Is it encrypted? */
@ -653,17 +673,20 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux,
gsize size;
GstBuffer *tmp_buffer;
gst_adapter_push (hlsdemux->pending_encrypted_data, buffer);
size = gst_adapter_available (hlsdemux->pending_encrypted_data);
if (hls_stream->pending_encrypted_data == NULL)
hls_stream->pending_encrypted_data = gst_adapter_new ();
gst_adapter_push (hls_stream->pending_encrypted_data, buffer);
size = gst_adapter_available (hls_stream->pending_encrypted_data);
/* must be a multiple of 16 */
size = size & (~0xF);
size &= (~0xF);
if (size == 0) {
return GST_FLOW_OK;
}
buffer = gst_adapter_take_buffer (hlsdemux->pending_encrypted_data, size);
buffer = gst_adapter_take_buffer (hls_stream->pending_encrypted_data, size);
buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err);
if (buffer == NULL) {
GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"),
@ -672,14 +695,26 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux,
return GST_FLOW_ERROR;
}
tmp_buffer = hlsdemux->pending_decrypted_buffer;
hlsdemux->pending_decrypted_buffer = buffer;
tmp_buffer = hls_stream->pending_decrypted_buffer;
hls_stream->pending_decrypted_buffer = buffer;
buffer = tmp_buffer;
}
return gst_hls_demux_handle_buffer (demux, stream, buffer, FALSE);
}
static void
gst_hls_demux_stream_free (GstAdaptiveDemuxStream * stream)
{
GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream);
if (hls_stream->pending_encrypted_data)
g_object_unref (hls_stream->pending_encrypted_data);
gst_buffer_replace (&hls_stream->pending_decrypted_buffer, NULL);
gst_buffer_replace (&hls_stream->pending_typefind_buffer, NULL);
}
static gboolean
gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream)
{
@ -692,17 +727,19 @@ gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream)
static GstFlowReturn
gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream * stream)
{
GstHLSDemuxStream *hlsdemux_stream = GST_HLS_DEMUX_STREAM_CAST (stream);
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (stream->demux);
gst_m3u8_client_advance_fragment (hlsdemux->client,
stream->demux->segment.rate > 0);
hlsdemux->reset_pts = FALSE;
hlsdemux_stream->reset_pts = FALSE;
return GST_FLOW_OK;
}
static GstFlowReturn
gst_hls_demux_update_fragment_info (GstAdaptiveDemuxStream * stream)
{
GstHLSDemuxStream *hlsdemux_stream = GST_HLS_DEMUX_STREAM_CAST (stream);
GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (stream->demux);
gchar *next_fragment_uri;
GstClockTime duration;
@ -723,7 +760,8 @@ gst_hls_demux_update_fragment_info (GstAdaptiveDemuxStream * stream)
discont = TRUE;
/* set up our source for download */
if (hlsdemux->reset_pts || discont || stream->demux->segment.rate < 0.0) {
if (hlsdemux_stream->reset_pts || discont
|| stream->demux->segment.rate < 0.0) {
stream->fragment.timestamp = timestamp;
} else {
stream->fragment.timestamp = GST_CLOCK_TIME_NONE;
@ -775,9 +813,6 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
{
GstHLSDemux *demux = GST_HLS_DEMUX_CAST (ademux);
demux->do_typefind = TRUE;
demux->reset_pts = TRUE;
g_free (demux->key_url);
demux->key_url = NULL;
@ -793,8 +828,9 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux)
demux->client = gst_m3u8_client_new ("", NULL);
demux->srcpad_counter = 0;
gst_hls_demux_clear_pending_data (demux);
gst_buffer_replace (&demux->pending_typefind_buffer, NULL);
if (demux->current_key) {
g_free (demux->current_key);
demux->current_key = NULL;
@ -1099,9 +1135,6 @@ retry_failover_protection:
return gst_hls_demux_change_playlist (demux, new_bandwidth - 1, changed);
}
/* Force typefinding since we might have changed media type */
demux->do_typefind = TRUE;
return TRUE;
}
@ -1137,7 +1170,7 @@ decrypt_fragment (GstHLSDemux * demux, gsize length,
}
static void
gst_hls_demux_decrypt_end (GstHLSDemux * demux)
gst_hls_stream_decrypt_end (GstHLSDemuxStream * stream)
{
EVP_CIPHER_CTX_cleanup (&demux->aes_ctx);
}

View file

@ -38,6 +38,7 @@
#endif
G_BEGIN_DECLS
#define GST_TYPE_HLS_DEMUX \
(gst_hls_demux_get_type())
#define GST_HLS_DEMUX(obj) \
@ -52,8 +53,26 @@ G_BEGIN_DECLS
(G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_HLS_DEMUX,GstHLSDemuxClass))
#define GST_HLS_DEMUX_CAST(obj) \
((GstHLSDemux *)obj)
typedef struct _GstHLSDemux GstHLSDemux;
typedef struct _GstHLSDemuxClass GstHLSDemuxClass;
typedef struct _GstHLSDemuxStream GstHLSDemuxStream;
#define GST_HLS_DEMUX_STREAM_CAST(stream) ((GstHLSDemuxStream *)(stream))
struct _GstHLSDemuxStream
{
GstAdaptiveDemux adaptive_demux_stream;
gboolean do_typefind; /* Whether we need to typefind the next buffer */
GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */
GstAdapter *pending_encrypted_data; /* for chunking data into 16 byte multiples for decryption */
GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding.
We only know that it is the last at EOS */
guint64 current_offset; /* offset we're currently at */
gboolean reset_pts;
};
/**
* GstHLSDemux:
@ -68,7 +87,6 @@ struct _GstHLSDemux
gchar *uri; /* Original playlist URI */
GstM3U8Client *client; /* M3U8 client */
gboolean do_typefind; /* Whether we need to typefind the next buffer */
/* Cache for the last key */
gchar *key_url;
@ -84,12 +102,6 @@ struct _GstHLSDemux
#endif
gchar *current_key;
guint8 *current_iv;
GstAdapter *pending_encrypted_data; /* for chunking data into 16 byte multiples for decryption */
GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding.
We only know that it is the last at EOS */
GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */
guint64 current_offset; /* offset we're currently at */
gboolean reset_pts;
};
struct _GstHLSDemuxClass