mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 04:01:08 +00:00
gst/realmedia/rdtmanager.c: Include the new rdt jitterbuffer in the session manager.
Original commit message from CVS: * gst/realmedia/rdtmanager.c: (create_session), (activate_session), (free_session), (gst_rdt_manager_query_src), (gst_rdt_manager_src_activate_push), (gst_rdt_manager_handle_data_packet), (gst_rdt_manager_chain_rdt), (gst_rdt_manager_loop), (create_recv_rtp): Include the new rdt jitterbuffer in the session manager.
This commit is contained in:
parent
6367c03a1d
commit
541aad907e
2 changed files with 359 additions and 54 deletions
|
@ -1,3 +1,12 @@
|
||||||
|
2008-08-27 Wim Taymans <wim.taymans@collabora.co.uk>
|
||||||
|
|
||||||
|
* gst/realmedia/rdtmanager.c: (create_session), (activate_session),
|
||||||
|
(free_session), (gst_rdt_manager_query_src),
|
||||||
|
(gst_rdt_manager_src_activate_push),
|
||||||
|
(gst_rdt_manager_handle_data_packet), (gst_rdt_manager_chain_rdt),
|
||||||
|
(gst_rdt_manager_loop), (create_recv_rtp):
|
||||||
|
Include the new rdt jitterbuffer in the session manager.
|
||||||
|
|
||||||
2008-08-27 Wim Taymans <wim.taymans@collabora.co.uk>
|
2008-08-27 Wim Taymans <wim.taymans@collabora.co.uk>
|
||||||
|
|
||||||
* gst/realmedia/rdtdepay.c: (gst_rdt_depay_class_init),
|
* gst/realmedia/rdtdepay.c: (gst_rdt_depay_class_init),
|
||||||
|
|
|
@ -52,7 +52,9 @@
|
||||||
|
|
||||||
/* #define HAVE_RTCP */
|
/* #define HAVE_RTCP */
|
||||||
|
|
||||||
|
#include "gstrdtbuffer.h"
|
||||||
#include "rdtmanager.h"
|
#include "rdtmanager.h"
|
||||||
|
#include "rdtjitterbuffer.h"
|
||||||
|
|
||||||
GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
|
GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
|
||||||
#define GST_CAT_DEFAULT (rdtmanager_debug)
|
#define GST_CAT_DEFAULT (rdtmanager_debug)
|
||||||
|
@ -123,6 +125,10 @@ static void gst_rdt_manager_set_property (GObject * object,
|
||||||
static void gst_rdt_manager_get_property (GObject * object,
|
static void gst_rdt_manager_get_property (GObject * object,
|
||||||
guint prop_id, GValue * value, GParamSpec * pspec);
|
guint prop_id, GValue * value, GParamSpec * pspec);
|
||||||
|
|
||||||
|
static gboolean gst_rdt_manager_query_src (GstPad * pad, GstQuery * query);
|
||||||
|
static gboolean gst_rdt_manager_src_activate_push (GstPad * pad,
|
||||||
|
gboolean active);
|
||||||
|
|
||||||
static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
|
static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
|
||||||
static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
|
static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
|
||||||
GstStateChange transition);
|
GstStateChange transition);
|
||||||
|
@ -130,11 +136,32 @@ static GstPad *gst_rdt_manager_request_new_pad (GstElement * element,
|
||||||
GstPadTemplate * templ, const gchar * name);
|
GstPadTemplate * templ, const gchar * name);
|
||||||
static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
|
static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
|
||||||
|
|
||||||
static GstFlowReturn gst_rdt_manager_chain_rtp (GstPad * pad,
|
static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad,
|
||||||
GstBuffer * buffer);
|
GstBuffer * buffer);
|
||||||
static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
|
static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
|
||||||
GstBuffer * buffer);
|
GstBuffer * buffer);
|
||||||
|
static void gst_rdt_manager_loop (GstPad * pad);
|
||||||
|
|
||||||
|
static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
|
||||||
|
|
||||||
|
#define JBUF_LOCK(sess) (g_mutex_lock ((sess)->jbuf_lock))
|
||||||
|
|
||||||
|
#define JBUF_LOCK_CHECK(sess,label) G_STMT_START { \
|
||||||
|
JBUF_LOCK (sess); \
|
||||||
|
if (sess->srcresult != GST_FLOW_OK) \
|
||||||
|
goto label; \
|
||||||
|
} G_STMT_END
|
||||||
|
|
||||||
|
#define JBUF_UNLOCK(sess) (g_mutex_unlock ((sess)->jbuf_lock))
|
||||||
|
#define JBUF_WAIT(sess) (g_cond_wait ((sess)->jbuf_cond, (sess)->jbuf_lock))
|
||||||
|
|
||||||
|
#define JBUF_WAIT_CHECK(sess,label) G_STMT_START { \
|
||||||
|
JBUF_WAIT(sess); \
|
||||||
|
if (sess->srcresult != GST_FLOW_OK) \
|
||||||
|
goto label; \
|
||||||
|
} G_STMT_END
|
||||||
|
|
||||||
|
#define JBUF_SIGNAL(sess) (g_cond_signal ((sess)->jbuf_cond))
|
||||||
|
|
||||||
/* Manages the receiving end of the packets.
|
/* Manages the receiving end of the packets.
|
||||||
*
|
*
|
||||||
|
@ -152,13 +179,39 @@ struct _GstRDTManagerSession
|
||||||
/* we only support one ssrc and one pt */
|
/* we only support one ssrc and one pt */
|
||||||
guint32 ssrc;
|
guint32 ssrc;
|
||||||
guint8 pt;
|
guint8 pt;
|
||||||
|
gint clock_rate;
|
||||||
GstCaps *caps;
|
GstCaps *caps;
|
||||||
|
|
||||||
|
GstSegment segment;
|
||||||
|
|
||||||
|
/* the last seqnum we pushed out */
|
||||||
|
guint32 last_popped_seqnum;
|
||||||
|
/* the next expected seqnum */
|
||||||
|
guint32 next_seqnum;
|
||||||
|
/* last output time */
|
||||||
|
GstClockTime last_out_time;
|
||||||
|
|
||||||
/* the pads of the session */
|
/* the pads of the session */
|
||||||
GstPad *recv_rtp_sink;
|
GstPad *recv_rtp_sink;
|
||||||
GstPad *recv_rtp_src;
|
GstPad *recv_rtp_src;
|
||||||
GstPad *recv_rtcp_sink;
|
GstPad *recv_rtcp_sink;
|
||||||
GstPad *rtcp_src;
|
GstPad *rtcp_src;
|
||||||
|
|
||||||
|
GstFlowReturn srcresult;
|
||||||
|
gboolean blocked;
|
||||||
|
gboolean eos;
|
||||||
|
gboolean waiting;
|
||||||
|
gboolean discont;
|
||||||
|
GstClockID clock_id;
|
||||||
|
|
||||||
|
/* jitterbuffer, lock and cond */
|
||||||
|
RDTJitterBuffer *jbuf;
|
||||||
|
GMutex *jbuf_lock;
|
||||||
|
GCond *jbuf_cond;
|
||||||
|
|
||||||
|
/* some accounting */
|
||||||
|
guint64 num_late;
|
||||||
|
guint64 num_duplicates;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* find a session with the given id */
|
/* find a session with the given id */
|
||||||
|
@ -185,19 +238,77 @@ create_session (GstRDTManager * rdtmanager, gint id)
|
||||||
sess = g_new0 (GstRDTManagerSession, 1);
|
sess = g_new0 (GstRDTManagerSession, 1);
|
||||||
sess->id = id;
|
sess->id = id;
|
||||||
sess->dec = rdtmanager;
|
sess->dec = rdtmanager;
|
||||||
|
sess->jbuf = rdt_jitter_buffer_new ();
|
||||||
|
sess->jbuf_lock = g_mutex_new ();
|
||||||
|
sess->jbuf_cond = g_cond_new ();
|
||||||
rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
|
rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
|
||||||
|
|
||||||
return sess;
|
return sess;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session,
|
||||||
|
guint32 ssrc, guint8 pt)
|
||||||
|
{
|
||||||
|
GstPadTemplate *templ;
|
||||||
|
GstElementClass *klass;
|
||||||
|
gchar *name;
|
||||||
|
GstCaps *caps;
|
||||||
|
GValue ret = { 0 };
|
||||||
|
GValue args[3] = { {0}
|
||||||
|
, {0}
|
||||||
|
, {0}
|
||||||
|
};
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "creating stream");
|
||||||
|
|
||||||
|
session->ssrc = ssrc;
|
||||||
|
session->pt = pt;
|
||||||
|
|
||||||
|
/* get pt map */
|
||||||
|
g_value_init (&args[0], GST_TYPE_ELEMENT);
|
||||||
|
g_value_set_object (&args[0], rdtmanager);
|
||||||
|
g_value_init (&args[1], G_TYPE_UINT);
|
||||||
|
g_value_set_uint (&args[1], session->id);
|
||||||
|
g_value_init (&args[2], G_TYPE_UINT);
|
||||||
|
g_value_set_uint (&args[2], pt);
|
||||||
|
|
||||||
|
g_value_init (&ret, GST_TYPE_CAPS);
|
||||||
|
g_value_set_boxed (&ret, NULL);
|
||||||
|
|
||||||
|
g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
|
||||||
|
&ret);
|
||||||
|
|
||||||
|
caps = (GstCaps *) g_value_get_boxed (&ret);
|
||||||
|
|
||||||
|
name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
|
||||||
|
klass = GST_ELEMENT_GET_CLASS (rdtmanager);
|
||||||
|
templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
|
||||||
|
session->recv_rtp_src = gst_pad_new_from_template (templ, name);
|
||||||
|
g_free (name);
|
||||||
|
|
||||||
|
gst_pad_set_caps (session->recv_rtp_src, caps);
|
||||||
|
|
||||||
|
gst_pad_set_element_private (session->recv_rtp_src, session);
|
||||||
|
gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src);
|
||||||
|
gst_pad_set_activatepush_function (session->recv_rtp_src,
|
||||||
|
gst_rdt_manager_src_activate_push);
|
||||||
|
|
||||||
|
gst_pad_set_active (session->recv_rtp_src, TRUE);
|
||||||
|
gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
|
||||||
|
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
free_session (GstRDTManagerSession * session)
|
free_session (GstRDTManagerSession * session)
|
||||||
{
|
{
|
||||||
|
g_object_unref (session->jbuf);
|
||||||
|
g_cond_free (session->jbuf_cond);
|
||||||
|
g_mutex_free (session->jbuf_lock);
|
||||||
g_free (session);
|
g_free (session);
|
||||||
}
|
}
|
||||||
|
|
||||||
static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
|
|
||||||
|
|
||||||
GST_BOILERPLATE (GstRDTManager, gst_rdt_manager, GstElement, GST_TYPE_ELEMENT);
|
GST_BOILERPLATE (GstRDTManager, gst_rdt_manager, GstElement, GST_TYPE_ELEMENT);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -418,7 +529,8 @@ gst_rdt_manager_query_src (GstPad * pad, GstQuery * query)
|
||||||
case GST_QUERY_LATENCY:
|
case GST_QUERY_LATENCY:
|
||||||
{
|
{
|
||||||
/* we pretend to be live with a 3 second latency */
|
/* we pretend to be live with a 3 second latency */
|
||||||
gst_query_set_latency (query, TRUE, 3 * GST_SECOND, -1);
|
gst_query_set_latency (query, TRUE, 5 * GST_SECOND, -1);
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "reporting 5 seconds of latency");
|
||||||
res = TRUE;
|
res = TRUE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -429,18 +541,125 @@ gst_rdt_manager_query_src (GstPad * pad, GstQuery * query)
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
gst_rdt_manager_src_activate_push (GstPad * pad, gboolean active)
|
||||||
|
{
|
||||||
|
gboolean result = TRUE;
|
||||||
|
GstRDTManager *rdtmanager;
|
||||||
|
GstRDTManagerSession *session;
|
||||||
|
|
||||||
|
session = gst_pad_get_element_private (pad);
|
||||||
|
rdtmanager = session->dec;
|
||||||
|
|
||||||
|
if (active) {
|
||||||
|
/* allow data processing */
|
||||||
|
JBUF_LOCK (session);
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue");
|
||||||
|
/* Mark as non flushing */
|
||||||
|
session->srcresult = GST_FLOW_OK;
|
||||||
|
gst_segment_init (&session->segment, GST_FORMAT_TIME);
|
||||||
|
session->last_popped_seqnum = -1;
|
||||||
|
session->last_out_time = -1;
|
||||||
|
session->next_seqnum = -1;
|
||||||
|
session->eos = FALSE;
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
|
||||||
|
/* start pushing out buffers */
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad");
|
||||||
|
gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop, pad);
|
||||||
|
} else {
|
||||||
|
/* make sure all data processing stops ASAP */
|
||||||
|
JBUF_LOCK (session);
|
||||||
|
/* mark ourselves as flushing */
|
||||||
|
session->srcresult = GST_FLOW_WRONG_STATE;
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue");
|
||||||
|
/* this unblocks any waiting pops on the src pad task */
|
||||||
|
JBUF_SIGNAL (session);
|
||||||
|
/* unlock clock, we just unschedule, the entry will be released by
|
||||||
|
* the locking streaming thread. */
|
||||||
|
if (session->clock_id)
|
||||||
|
gst_clock_id_unschedule (session->clock_id);
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
|
||||||
|
/* NOTE this will hardlock if the state change is called from the src pad
|
||||||
|
* task thread because we will _join() the thread. */
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad");
|
||||||
|
result = gst_pad_stop_task (pad);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer)
|
gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session,
|
||||||
|
GstClockTime timestamp, GstRDTPacket * packet)
|
||||||
|
{
|
||||||
|
GstRDTManager *rdtmanager;
|
||||||
|
guint16 seqnum;
|
||||||
|
gboolean tail;
|
||||||
|
GstFlowReturn res;
|
||||||
|
GstBuffer *buffer;
|
||||||
|
|
||||||
|
rdtmanager = session->dec;
|
||||||
|
|
||||||
|
res = GST_FLOW_OK;
|
||||||
|
|
||||||
|
seqnum = 0;
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager,
|
||||||
|
"Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
|
||||||
|
GST_TIME_ARGS (timestamp));
|
||||||
|
|
||||||
|
buffer = gst_rdt_packet_to_buffer (packet);
|
||||||
|
|
||||||
|
JBUF_LOCK_CHECK (session, out_flushing);
|
||||||
|
|
||||||
|
/* insert the packet into the queue now, FIXME, use seqnum */
|
||||||
|
if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp,
|
||||||
|
session->clock_rate, &tail))
|
||||||
|
goto duplicate;
|
||||||
|
|
||||||
|
/* signal addition of new buffer when the _loop is waiting. */
|
||||||
|
if (session->waiting)
|
||||||
|
JBUF_SIGNAL (session);
|
||||||
|
|
||||||
|
finished:
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
|
||||||
|
return res;
|
||||||
|
|
||||||
|
/* ERRORS */
|
||||||
|
out_flushing:
|
||||||
|
{
|
||||||
|
res = session->srcresult;
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res));
|
||||||
|
gst_buffer_unref (buffer);
|
||||||
|
goto finished;
|
||||||
|
}
|
||||||
|
duplicate:
|
||||||
|
{
|
||||||
|
GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping",
|
||||||
|
seqnum);
|
||||||
|
session->num_duplicates++;
|
||||||
|
gst_buffer_unref (buffer);
|
||||||
|
goto finished;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static GstFlowReturn
|
||||||
|
gst_rdt_manager_chain_rdt (GstPad * pad, GstBuffer * buffer)
|
||||||
{
|
{
|
||||||
GstFlowReturn res;
|
GstFlowReturn res;
|
||||||
GstRDTManager *rdtmanager;
|
GstRDTManager *rdtmanager;
|
||||||
GstRDTManagerSession *session;
|
GstRDTManagerSession *session;
|
||||||
|
GstClockTime timestamp;
|
||||||
|
GstRDTPacket packet;
|
||||||
guint32 ssrc;
|
guint32 ssrc;
|
||||||
guint8 pt;
|
guint8 pt;
|
||||||
|
gboolean more;
|
||||||
|
|
||||||
rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
|
rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (rdtmanager, "got rtp packet");
|
GST_DEBUG_OBJECT (rdtmanager, "got RDT packet");
|
||||||
|
|
||||||
ssrc = 0;
|
ssrc = 0;
|
||||||
pt = 0;
|
pt = 0;
|
||||||
|
@ -452,61 +671,138 @@ gst_rdt_manager_chain_rtp (GstPad * pad, GstBuffer * buffer)
|
||||||
|
|
||||||
/* see if we have the pad */
|
/* see if we have the pad */
|
||||||
if (!session->active) {
|
if (!session->active) {
|
||||||
GstPadTemplate *templ;
|
activate_session (rdtmanager, session, ssrc, pt);
|
||||||
GstElementClass *klass;
|
|
||||||
gchar *name;
|
|
||||||
GstCaps *caps;
|
|
||||||
GValue ret = { 0 };
|
|
||||||
GValue args[3] = { {0}
|
|
||||||
, {0}
|
|
||||||
, {0}
|
|
||||||
};
|
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (rdtmanager, "creating stream");
|
|
||||||
|
|
||||||
session->ssrc = ssrc;
|
|
||||||
session->pt = pt;
|
|
||||||
|
|
||||||
/* get pt map */
|
|
||||||
g_value_init (&args[0], GST_TYPE_ELEMENT);
|
|
||||||
g_value_set_object (&args[0], rdtmanager);
|
|
||||||
g_value_init (&args[1], G_TYPE_UINT);
|
|
||||||
g_value_set_uint (&args[1], session->id);
|
|
||||||
g_value_init (&args[2], G_TYPE_UINT);
|
|
||||||
g_value_set_uint (&args[2], pt);
|
|
||||||
|
|
||||||
g_value_init (&ret, GST_TYPE_CAPS);
|
|
||||||
g_value_set_boxed (&ret, NULL);
|
|
||||||
|
|
||||||
g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
|
|
||||||
&ret);
|
|
||||||
|
|
||||||
caps = (GstCaps *) g_value_get_boxed (&ret);
|
|
||||||
|
|
||||||
name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
|
|
||||||
klass = GST_ELEMENT_GET_CLASS (rdtmanager);
|
|
||||||
templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
|
|
||||||
session->recv_rtp_src = gst_pad_new_from_template (templ, name);
|
|
||||||
g_free (name);
|
|
||||||
|
|
||||||
gst_pad_set_caps (session->recv_rtp_src, caps);
|
|
||||||
|
|
||||||
gst_pad_set_element_private (session->recv_rtp_src, session);
|
|
||||||
gst_pad_set_query_function (session->recv_rtp_src,
|
|
||||||
gst_rdt_manager_query_src);
|
|
||||||
gst_pad_set_active (session->recv_rtp_src, TRUE);
|
|
||||||
gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
|
|
||||||
|
|
||||||
session->active = TRUE;
|
session->active = TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
|
if (GST_BUFFER_IS_DISCONT (buffer)) {
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "received discont");
|
||||||
|
session->discont = TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
res = gst_pad_push (session->recv_rtp_src, buffer);
|
res = GST_FLOW_OK;
|
||||||
|
|
||||||
|
/* take the timestamp of the buffer. This is the time when the packet was
|
||||||
|
* received and is used to calculate jitter and clock skew. We will adjust
|
||||||
|
* this timestamp with the smoothed value after processing it in the
|
||||||
|
* jitterbuffer. */
|
||||||
|
timestamp = GST_BUFFER_TIMESTAMP (buffer);
|
||||||
|
/* bring to running time */
|
||||||
|
timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME,
|
||||||
|
timestamp);
|
||||||
|
|
||||||
|
more = gst_rdt_buffer_get_first_packet (buffer, &packet);
|
||||||
|
while (more) {
|
||||||
|
GstRDTType type;
|
||||||
|
|
||||||
|
type = gst_rdt_packet_get_type (&packet);
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type);
|
||||||
|
|
||||||
|
if (GST_RDT_IS_DATA_TYPE (type)) {
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "We have a data packet");
|
||||||
|
res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet);
|
||||||
|
} else {
|
||||||
|
switch (type) {
|
||||||
|
default:
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (res != GST_FLOW_OK)
|
||||||
|
break;
|
||||||
|
|
||||||
|
more = gst_rdt_packet_move_to_next (&packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_buffer_unref (buffer);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* push packets from the queue to the downstream demuxer */
|
||||||
|
static void
|
||||||
|
gst_rdt_manager_loop (GstPad * pad)
|
||||||
|
{
|
||||||
|
GstRDTManager *rdtmanager;
|
||||||
|
GstRDTManagerSession *session;
|
||||||
|
GstBuffer *buffer;
|
||||||
|
GstFlowReturn result;
|
||||||
|
|
||||||
|
rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
|
||||||
|
|
||||||
|
session = gst_pad_get_element_private (pad);
|
||||||
|
|
||||||
|
JBUF_LOCK_CHECK (session, flushing);
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Peeking item");
|
||||||
|
while (TRUE) {
|
||||||
|
/* always wait if we are blocked */
|
||||||
|
if (!session->blocked) {
|
||||||
|
/* if we have a packet, we can exit the loop and grab it */
|
||||||
|
if (rdt_jitter_buffer_num_packets (session->jbuf) > 0)
|
||||||
|
break;
|
||||||
|
/* no packets but we are EOS, do eos logic */
|
||||||
|
if (session->eos)
|
||||||
|
goto do_eos;
|
||||||
|
}
|
||||||
|
/* underrun, wait for packets or flushing now */
|
||||||
|
session->waiting = TRUE;
|
||||||
|
JBUF_WAIT_CHECK (session, flushing);
|
||||||
|
session->waiting = FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = rdt_jitter_buffer_pop (session->jbuf);
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer);
|
||||||
|
|
||||||
|
if (session->discont) {
|
||||||
|
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
|
||||||
|
session->discont = FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
|
||||||
|
result = gst_pad_push (session->recv_rtp_src, buffer);
|
||||||
|
if (result != GST_FLOW_OK)
|
||||||
|
goto pause;
|
||||||
|
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* ERRORS */
|
||||||
|
flushing:
|
||||||
|
{
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "we are flushing");
|
||||||
|
gst_pad_pause_task (session->recv_rtp_src);
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
do_eos:
|
||||||
|
{
|
||||||
|
/* store result, we are flushing now */
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream");
|
||||||
|
session->srcresult = GST_FLOW_UNEXPECTED;
|
||||||
|
gst_pad_pause_task (session->recv_rtp_src);
|
||||||
|
gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ());
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
pause:
|
||||||
|
{
|
||||||
|
const gchar *reason = gst_flow_get_name (result);
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s", reason);
|
||||||
|
|
||||||
|
JBUF_LOCK (session);
|
||||||
|
/* store result */
|
||||||
|
session->srcresult = result;
|
||||||
|
/* we don't post errors or anything because upstream will do that for us
|
||||||
|
* when we pass the return value upstream. */
|
||||||
|
gst_pad_pause_task (session->recv_rtp_src);
|
||||||
|
JBUF_UNLOCK (session);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer)
|
gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer)
|
||||||
{
|
{
|
||||||
|
@ -771,7 +1067,7 @@ create_recv_rtp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
|
||||||
session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
|
session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
|
||||||
gst_pad_set_element_private (session->recv_rtp_sink, session);
|
gst_pad_set_element_private (session->recv_rtp_sink, session);
|
||||||
gst_pad_set_chain_function (session->recv_rtp_sink,
|
gst_pad_set_chain_function (session->recv_rtp_sink,
|
||||||
gst_rdt_manager_chain_rtp);
|
gst_rdt_manager_chain_rdt);
|
||||||
gst_pad_set_active (session->recv_rtp_sink, TRUE);
|
gst_pad_set_active (session->recv_rtp_sink, TRUE);
|
||||||
gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);
|
gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue