mssdemux: initial implementation of the pad loop

The loop will fetch a new fragment and push on its srcpad. Each
stream has its own loop
This commit is contained in:
Thiago Santos 2012-11-16 17:30:12 -03:00
parent 6b63a7dc81
commit 7ceb023682
2 changed files with 87 additions and 5 deletions

View file

@ -69,6 +69,8 @@ gst_mss_demux_change_state (GstElement * element, GstStateChange transition);
static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
static void
@ -119,6 +121,52 @@ gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
}
static GstMssDemuxStream *
gst_mss_demux_stream_new (GstMssDemux * mssdemux,
GstMssManifestStream * manifeststream, GstPad * srcpad)
{
GstMssDemuxStream *stream;
stream = g_new0 (GstMssDemuxStream, 1);
stream->downloader = gst_uri_downloader_new ();
/* Streaming task */
g_static_rec_mutex_init (&stream->stream_lock);
stream->stream_task =
gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
gst_task_set_lock (stream->stream_task, &stream->stream_lock);
stream->pad = srcpad;
stream->manifest_stream = manifeststream;
stream->parent = mssdemux;
return stream;
}
static void
gst_mss_demux_stream_free (GstMssDemuxStream * stream)
{
if (stream->stream_task) {
if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
GST_DEBUG_PAD_NAME (stream->pad));
gst_task_stop (stream->stream_task);
g_static_rec_mutex_lock (&stream->stream_lock);
g_static_rec_mutex_unlock (&stream->stream_lock);
gst_task_join (stream->stream_task);
}
gst_object_unref (stream->stream_task);
g_static_rec_mutex_free (&stream->stream_lock);
stream->stream_task = NULL;
}
if (stream->downloader != NULL) {
g_object_unref (stream->downloader);
stream->downloader = NULL;
}
g_free (stream);
}
static void
gst_mss_demux_reset (GstMssDemux * mssdemux)
{
@ -131,7 +179,7 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
GstMssDemuxStream *stream = iter->data;
gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
g_free (stream);
gst_mss_demux_stream_free (stream);
}
g_slist_free (mssdemux->streams);
mssdemux->streams = NULL;
@ -266,11 +314,8 @@ gst_mss_demux_create_streams (GstMssDemux * mssdemux)
continue;
}
stream = g_new (GstMssDemuxStream, 1);
stream->pad = srcpad;
stream->manifest_stream = manifeststream;
stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
mssdemux->streams = g_slist_append (mssdemux->streams, stream);
}
}
@ -327,3 +372,31 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
gst_mss_demux_expose_stream (mssdemux, iter->data);
}
}
static void
gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
{
gchar *url;
GstFragment *fragment;
GstBufferList *buflist;
GstFlowReturn ret;
ret = gst_mss_manifest_stream_get_fragment_url (stream->manifest_stream,
&url);
switch (ret) {
default:
break;
}
if (!url) {
/* TODO */
}
fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
g_free (url);
buflist = gst_fragment_get_buffer_list (fragment);
ret = gst_pad_push_list (stream->pad, buflist); /* TODO check return */
ret = gst_mss_manifest_stream_advance_fragment (stream->manifest_stream);
}

View file

@ -26,6 +26,7 @@
#include <gst/gst.h>
#include <gst/base/gstadapter.h>
#include "gstmssmanifest.h"
#include "gsturidownloader.h"
G_BEGIN_DECLS
@ -52,7 +53,15 @@ typedef struct _GstMssDemuxClass GstMssDemuxClass;
struct _GstMssDemuxStream {
GstPad *pad;
GstMssDemux *parent;
GstMssManifestStream *manifest_stream;
GstUriDownloader *downloader;
/* Streaming task */
GstTask *stream_task;
GStaticRecMutex stream_lock;
};
struct _GstMssDemux {