From 541aad907e517818a79c94f5397c6b3d94e5e6fa Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 27 Aug 2008 10:02:06 +0000 Subject: [PATCH] gst/realmedia/rdtmanager.c: Include the new rdt jitterbuffer in the session manager. Original commit message from CVS: * gst/realmedia/rdtmanager.c: (create_session), (activate_session), (free_session), (gst_rdt_manager_query_src), (gst_rdt_manager_src_activate_push), (gst_rdt_manager_handle_data_packet), (gst_rdt_manager_chain_rdt), (gst_rdt_manager_loop), (create_recv_rtp): Include the new rdt jitterbuffer in the session manager. --- ChangeLog | 9 + gst/realmedia/rdtmanager.c | 404 ++++++++++++++++++++++++++++++++----- 2 files changed, 359 insertions(+), 54 deletions(-) diff --git a/ChangeLog b/ChangeLog index ea517c84db..bd798bfa56 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2008-08-27 Wim Taymans + + * gst/realmedia/rdtmanager.c: (create_session), (activate_session), + (free_session), (gst_rdt_manager_query_src), + (gst_rdt_manager_src_activate_push), + (gst_rdt_manager_handle_data_packet), (gst_rdt_manager_chain_rdt), + (gst_rdt_manager_loop), (create_recv_rtp): + Include the new rdt jitterbuffer in the session manager. + 2008-08-27 Wim Taymans * gst/realmedia/rdtdepay.c: (gst_rdt_depay_class_init), diff --git a/gst/realmedia/rdtmanager.c b/gst/realmedia/rdtmanager.c index aad041fe6e..144a0f46bf 100644 --- a/gst/realmedia/rdtmanager.c +++ b/gst/realmedia/rdtmanager.c @@ -52,7 +52,9 @@ /* #define HAVE_RTCP */ +#include "gstrdtbuffer.h" #include "rdtmanager.h" +#include "rdtjitterbuffer.h" GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug); #define GST_CAT_DEFAULT (rdtmanager_debug) @@ -123,6 +125,10 @@ static void gst_rdt_manager_set_property (GObject * object, static void gst_rdt_manager_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); +static gboolean gst_rdt_manager_query_src (GstPad * pad, GstQuery * query); +static gboolean gst_rdt_manager_src_activate_push (GstPad * pad, + gboolean active); + static GstClock *gst_rdt_manager_provide_clock (GstElement * element); static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element, GstStateChange transition); @@ -130,11 +136,32 @@ static GstPad *gst_rdt_manager_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name); static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad); -static GstFlowReturn gst_rdt_manager_chain_rtp (GstPad * pad, +static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer); +static void gst_rdt_manager_loop (GstPad * pad); +static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 }; + +#define JBUF_LOCK(sess) (g_mutex_lock ((sess)->jbuf_lock)) + +#define JBUF_LOCK_CHECK(sess,label) G_STMT_START { \ + JBUF_LOCK (sess); \ + if (sess->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define JBUF_UNLOCK(sess) (g_mutex_unlock ((sess)->jbuf_lock)) +#define JBUF_WAIT(sess) (g_cond_wait ((sess)->jbuf_cond, (sess)->jbuf_lock)) + +#define JBUF_WAIT_CHECK(sess,label) G_STMT_START { \ + JBUF_WAIT(sess); \ + if (sess->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define JBUF_SIGNAL(sess) (g_cond_signal ((sess)->jbuf_cond)) /* Manages the receiving end of the packets. * @@ -152,13 +179,39 @@ struct _GstRDTManagerSession /* we only support one ssrc and one pt */ guint32 ssrc; guint8 pt; + gint clock_rate; GstCaps *caps; + GstSegment segment; + + /* the last seqnum we pushed out */ + guint32 last_popped_seqnum; + /* the next expected seqnum */ + guint32 next_seqnum; + /* last output time */ + GstClockTime last_out_time; + /* the pads of the session */ GstPad *recv_rtp_sink; GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; GstPad *rtcp_src; + + GstFlowReturn srcresult; + gboolean blocked; + gboolean eos; + gboolean waiting; + gboolean discont; + GstClockID clock_id; + + /* jitterbuffer, lock and cond */ + RDTJitterBuffer *jbuf; + GMutex *jbuf_lock; + GCond *jbuf_cond; + + /* some accounting */ + guint64 num_late; + guint64 num_duplicates; }; /* find a session with the given id */ @@ -185,19 +238,77 @@ create_session (GstRDTManager * rdtmanager, gint id) sess = g_new0 (GstRDTManagerSession, 1); sess->id = id; sess->dec = rdtmanager; + sess->jbuf = rdt_jitter_buffer_new (); + sess->jbuf_lock = g_mutex_new (); + sess->jbuf_cond = g_cond_new (); rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess); return sess; } +static gboolean +activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session, + guint32 ssrc, guint8 pt) +{ + GstPadTemplate *templ; + GstElementClass *klass; + gchar *name; + GstCaps *caps; + GValue ret = { 0 }; + GValue args[3] = { {0} + , {0} + , {0} + }; + + GST_DEBUG_OBJECT (rdtmanager, "creating stream"); + + session->ssrc = ssrc; + session->pt = pt; + + /* get pt map */ + g_value_init (&args[0], GST_TYPE_ELEMENT); + g_value_set_object (&args[0], rdtmanager); + g_value_init (&args[1], G_TYPE_UINT); + g_value_set_uint (&args[1], session->id); + g_value_init (&args[2], G_TYPE_UINT); + g_value_set_uint (&args[2], pt); + + g_value_init (&ret, GST_TYPE_CAPS); + g_value_set_boxed (&ret, NULL); + + g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0, + &ret); + + caps = (GstCaps *) g_value_get_boxed (&ret); + + name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt); + klass = GST_ELEMENT_GET_CLASS (rdtmanager); + templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d"); + session->recv_rtp_src = gst_pad_new_from_template (templ, name); + g_free (name); + + gst_pad_set_caps (session->recv_rtp_src, caps); + + gst_pad_set_element_private (session->recv_rtp_src, session); + gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src); + gst_pad_set_activatepush_function (session->recv_rtp_src, + gst_rdt_manager_src_activate_push); + + gst_pad_set_active (session->recv_rtp_src, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src); + + return TRUE; +} + static void free_session (GstRDTManagerSession * session) { + g_object_unref (session->jbuf); + g_cond_free (session->jbuf_cond); + g_mutex_free (session->jbuf_lock); g_free (session); } -static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 }; - GST_BOILERPLATE (GstRDTManager, gst_rdt_manager, GstElement, GST_TYPE_ELEMENT); static void @@ -418,7 +529,8 @@ gst_rdt_manager_query_src (GstPad * pad, GstQuery * query) case GST_QUERY_LATENCY: { /* we pretend to be live with a 3 second latency */ - gst_query_set_latency (query, TRUE, 3 * GST_SECOND, -1); + gst_query_set_latency (query, TRUE, 5 * GST_SECOND, -1); + GST_DEBUG_OBJECT (rdtmanager, "reporting 5 seconds of latency"); res = TRUE; break; } @@ -429,18 +541,125 @@ gst_rdt_manager_query_src (GstPad * pad, GstQuery * query) return res; } +static gboolean +gst_rdt_manager_src_activate_push (GstPad * pad, gboolean active) +{ + gboolean result = TRUE; + GstRDTManager *rdtmanager; + GstRDTManagerSession *session; + + session = gst_pad_get_element_private (pad); + rdtmanager = session->dec; + + if (active) { + /* allow data processing */ + JBUF_LOCK (session); + GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue"); + /* Mark as non flushing */ + session->srcresult = GST_FLOW_OK; + gst_segment_init (&session->segment, GST_FORMAT_TIME); + session->last_popped_seqnum = -1; + session->last_out_time = -1; + session->next_seqnum = -1; + session->eos = FALSE; + JBUF_UNLOCK (session); + + /* start pushing out buffers */ + GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad"); + gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop, pad); + } else { + /* make sure all data processing stops ASAP */ + JBUF_LOCK (session); + /* mark ourselves as flushing */ + session->srcresult = GST_FLOW_WRONG_STATE; + GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue"); + /* this unblocks any waiting pops on the src pad task */ + JBUF_SIGNAL (session); + /* unlock clock, we just unschedule, the entry will be released by + * the locking streaming thread. */ + if (session->clock_id) + gst_clock_id_unschedule (session->clock_id); + JBUF_UNLOCK (session); + + /* NOTE this will hardlock if the state change is called from the src pad + * task thread because we will _join() the thread. */ + GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad"); + result = gst_pad_stop_task (pad); + } + + return result; +} + static GstFlowReturn -gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer) +gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session, + GstClockTime timestamp, GstRDTPacket * packet) +{ + GstRDTManager *rdtmanager; + guint16 seqnum; + gboolean tail; + GstFlowReturn res; + GstBuffer *buffer; + + rdtmanager = session->dec; + + res = GST_FLOW_OK; + + seqnum = 0; + GST_DEBUG_OBJECT (rdtmanager, + "Received packet #%d at time %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (timestamp)); + + buffer = gst_rdt_packet_to_buffer (packet); + + JBUF_LOCK_CHECK (session, out_flushing); + + /* insert the packet into the queue now, FIXME, use seqnum */ + if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp, + session->clock_rate, &tail)) + goto duplicate; + + /* signal addition of new buffer when the _loop is waiting. */ + if (session->waiting) + JBUF_SIGNAL (session); + +finished: + JBUF_UNLOCK (session); + + return res; + + /* ERRORS */ +out_flushing: + { + res = session->srcresult; + GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res)); + gst_buffer_unref (buffer); + goto finished; + } +duplicate: + { + GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping", + seqnum); + session->num_duplicates++; + gst_buffer_unref (buffer); + goto finished; + } +} + +static GstFlowReturn +gst_rdt_manager_chain_rdt (GstPad * pad, GstBuffer * buffer) { GstFlowReturn res; GstRDTManager *rdtmanager; GstRDTManagerSession *session; + GstClockTime timestamp; + GstRDTPacket packet; guint32 ssrc; guint8 pt; + gboolean more; rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad)); - GST_DEBUG_OBJECT (rdtmanager, "got rtp packet"); + GST_DEBUG_OBJECT (rdtmanager, "got RDT packet"); ssrc = 0; pt = 0; @@ -452,61 +671,138 @@ gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer) /* see if we have the pad */ if (!session->active) { - GstPadTemplate *templ; - GstElementClass *klass; - gchar *name; - GstCaps *caps; - GValue ret = { 0 }; - GValue args[3] = { {0} - , {0} - , {0} - }; - - GST_DEBUG_OBJECT (rdtmanager, "creating stream"); - - session->ssrc = ssrc; - session->pt = pt; - - /* get pt map */ - g_value_init (&args[0], GST_TYPE_ELEMENT); - g_value_set_object (&args[0], rdtmanager); - g_value_init (&args[1], G_TYPE_UINT); - g_value_set_uint (&args[1], session->id); - g_value_init (&args[2], G_TYPE_UINT); - g_value_set_uint (&args[2], pt); - - g_value_init (&ret, GST_TYPE_CAPS); - g_value_set_boxed (&ret, NULL); - - g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0, - &ret); - - caps = (GstCaps *) g_value_get_boxed (&ret); - - name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt); - klass = GST_ELEMENT_GET_CLASS (rdtmanager); - templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d"); - session->recv_rtp_src = gst_pad_new_from_template (templ, name); - g_free (name); - - gst_pad_set_caps (session->recv_rtp_src, caps); - - gst_pad_set_element_private (session->recv_rtp_src, session); - gst_pad_set_query_function (session->recv_rtp_src, - gst_rdt_manager_query_src); - gst_pad_set_active (session->recv_rtp_src, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src); - + activate_session (rdtmanager, session, ssrc, pt); session->active = TRUE; } - gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src)); + if (GST_BUFFER_IS_DISCONT (buffer)) { + GST_DEBUG_OBJECT (rdtmanager, "received discont"); + session->discont = TRUE; + } - res = gst_pad_push (session->recv_rtp_src, buffer); + res = GST_FLOW_OK; + + /* take the timestamp of the buffer. This is the time when the packet was + * received and is used to calculate jitter and clock skew. We will adjust + * this timestamp with the smoothed value after processing it in the + * jitterbuffer. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + /* bring to running time */ + timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME, + timestamp); + + more = gst_rdt_buffer_get_first_packet (buffer, &packet); + while (more) { + GstRDTType type; + + type = gst_rdt_packet_get_type (&packet); + GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type); + + if (GST_RDT_IS_DATA_TYPE (type)) { + GST_DEBUG_OBJECT (rdtmanager, "We have a data packet"); + res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet); + } else { + switch (type) { + default: + GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet"); + break; + } + } + if (res != GST_FLOW_OK) + break; + + more = gst_rdt_packet_move_to_next (&packet); + } + + gst_buffer_unref (buffer); return res; } +/* push packets from the queue to the downstream demuxer */ +static void +gst_rdt_manager_loop (GstPad * pad) +{ + GstRDTManager *rdtmanager; + GstRDTManagerSession *session; + GstBuffer *buffer; + GstFlowReturn result; + + rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad)); + + session = gst_pad_get_element_private (pad); + + JBUF_LOCK_CHECK (session, flushing); + GST_DEBUG_OBJECT (rdtmanager, "Peeking item"); + while (TRUE) { + /* always wait if we are blocked */ + if (!session->blocked) { + /* if we have a packet, we can exit the loop and grab it */ + if (rdt_jitter_buffer_num_packets (session->jbuf) > 0) + break; + /* no packets but we are EOS, do eos logic */ + if (session->eos) + goto do_eos; + } + /* underrun, wait for packets or flushing now */ + session->waiting = TRUE; + JBUF_WAIT_CHECK (session, flushing); + session->waiting = FALSE; + } + + buffer = rdt_jitter_buffer_pop (session->jbuf); + + GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer); + + if (session->discont) { + GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); + session->discont = FALSE; + } + + gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src)); + JBUF_UNLOCK (session); + + result = gst_pad_push (session->recv_rtp_src, buffer); + if (result != GST_FLOW_OK) + goto pause; + + return; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (rdtmanager, "we are flushing"); + gst_pad_pause_task (session->recv_rtp_src); + JBUF_UNLOCK (session); + return; + } +do_eos: + { + /* store result, we are flushing now */ + GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream"); + session->srcresult = GST_FLOW_UNEXPECTED; + gst_pad_pause_task (session->recv_rtp_src); + gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ()); + JBUF_UNLOCK (session); + return; + } +pause: + { + const gchar *reason = gst_flow_get_name (result); + + GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s", reason); + + JBUF_LOCK (session); + /* store result */ + session->srcresult = result; + /* we don't post errors or anything because upstream will do that for us + * when we pass the return value upstream. */ + gst_pad_pause_task (session->recv_rtp_src); + JBUF_UNLOCK (session); + return; + } +} + static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer) { @@ -771,7 +1067,7 @@ create_recv_rtp (GstRDTManager * rdtmanager, GstPadTemplate * templ, session->recv_rtp_sink = gst_pad_new_from_template (templ, name); gst_pad_set_element_private (session->recv_rtp_sink, session); gst_pad_set_chain_function (session->recv_rtp_sink, - gst_rdt_manager_chain_rtp); + gst_rdt_manager_chain_rdt); gst_pad_set_active (session->recv_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);