gst/rtsp/gstrtpdec.*: Add (dummy) SSRC management signals.

Original commit message from CVS:
* gst/rtsp/gstrtpdec.c: (gst_rtp_dec_marshal_VOID__UINT_UINT),
(gst_rtp_dec_class_init):
* gst/rtsp/gstrtpdec.h:
Add (dummy) SSRC management signals.
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
(gst_rtspsrc_set_property), (gst_rtspsrc_get_property),
(find_stream), (gst_rtspsrc_create_stream), (new_session_pad),
(request_pt_map), (gst_rtspsrc_do_stream_eos), (on_bye_ssrc),
(on_timeout), (gst_rtspsrc_stream_configure_manager),
(gst_rtspsrc_stream_push_event), (gst_rtspsrc_push_event),
(gst_rtspsrc_loop_interleaved), (gst_rtspsrc_parse_rtpinfo),
(gst_rtspsrc_handle_message), (gst_rtspsrc_change_state):
* gst/rtsp/gstrtspsrc.h:
Add connection-speed property.
Add find_stream helper functions.
Handle stream EOS based on BYE messages or SSRC timeout.
Returns SUCCESS from the state change function as we hide our async
elements from the parent.
This commit is contained in:
Wim Taymans 2007-08-16 11:47:19 +00:00
parent a490cffe5f
commit 41f0496738
5 changed files with 295 additions and 58 deletions

View file

@ -1,3 +1,25 @@
2007-08-16 Wim Taymans <wim.taymans@gmail.com>
* gst/rtsp/gstrtpdec.c: (gst_rtp_dec_marshal_VOID__UINT_UINT),
(gst_rtp_dec_class_init):
* gst/rtsp/gstrtpdec.h:
Add (dummy) SSRC management signals.
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
(gst_rtspsrc_set_property), (gst_rtspsrc_get_property),
(find_stream), (gst_rtspsrc_create_stream), (new_session_pad),
(request_pt_map), (gst_rtspsrc_do_stream_eos), (on_bye_ssrc),
(on_timeout), (gst_rtspsrc_stream_configure_manager),
(gst_rtspsrc_stream_push_event), (gst_rtspsrc_push_event),
(gst_rtspsrc_loop_interleaved), (gst_rtspsrc_parse_rtpinfo),
(gst_rtspsrc_handle_message), (gst_rtspsrc_change_state):
* gst/rtsp/gstrtspsrc.h:
Add connection-speed property.
Add find_stream helper functions.
Handle stream EOS based on BYE messages or SSRC timeout.
Returns SUCCESS from the state change function as we hide our async
elements from the parent.
2007-08-16 Stefan Kost <ensonic@users.sf.net>
* gst/debug/rndbuffersize.c:

View file

@ -78,6 +78,13 @@ enum
{
SIGNAL_REQUEST_PT_MAP,
SIGNAL_CLEAR_PT_MAP,
SIGNAL_ON_NEW_SSRC,
SIGNAL_ON_SSRC_COLLISION,
SIGNAL_ON_SSRC_VALIDATED,
SIGNAL_ON_BYE_SSRC,
SIGNAL_ON_BYE_TIMEOUT,
SIGNAL_ON_TIMEOUT,
LAST_SIGNAL
};
@ -255,6 +262,37 @@ gst_rtp_dec_marshal_BOXED__UINT_UINT (GClosure * closure,
g_value_take_boxed (return_value, v_return);
}
void
gst_rtp_dec_marshal_VOID__UINT_UINT (GClosure * closure,
GValue * return_value,
guint n_param_values,
const GValue * param_values,
gpointer invocation_hint, gpointer marshal_data)
{
typedef void (*GMarshalFunc_VOID__UINT_UINT) (gpointer data1,
guint arg_1, guint arg_2, gpointer data2);
register GMarshalFunc_VOID__UINT_UINT callback;
register GCClosure *cc = (GCClosure *) closure;
register gpointer data1, data2;
g_return_if_fail (n_param_values == 3);
if (G_CCLOSURE_SWAP_DATA (closure)) {
data1 = closure->data;
data2 = g_value_peek_pointer (param_values + 0);
} else {
data1 = g_value_peek_pointer (param_values + 0);
data2 = closure->data;
}
callback =
(GMarshalFunc_VOID__UINT_UINT) (marshal_data ? marshal_data : cc->
callback);
callback (data1,
g_marshal_value_peek_uint (param_values + 1),
g_marshal_value_peek_uint (param_values + 2), data2);
}
static void
gst_rtp_dec_class_init (GstRTPDecClass * g_class)
{
@ -294,6 +332,87 @@ gst_rtp_dec_class_init (GstRTPDecClass * g_class)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, clear_pt_map),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
/**
* GstRTPDec::on-new-ssrc:
* @rtpbin: the object which received the signal
* @session: the session
* @ssrc: the SSRC
*
* Notify of a new SSRC that entered @session.
*/
gst_rtp_dec_signals[SIGNAL_ON_NEW_SSRC] =
g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, on_new_ssrc),
NULL, NULL, gst_rtp_dec_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
G_TYPE_UINT, G_TYPE_UINT);
/**
* GstRTPDec::on-ssrc_collision:
* @rtpbin: the object which received the signal
* @session: the session
* @ssrc: the SSRC
*
* Notify when we have an SSRC collision
*/
gst_rtp_dec_signals[SIGNAL_ON_SSRC_COLLISION] =
g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, on_ssrc_collision),
NULL, NULL, gst_rtp_dec_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
G_TYPE_UINT, G_TYPE_UINT);
/**
* GstRTPDec::on-ssrc_validated:
* @rtpbin: the object which received the signal
* @session: the session
* @ssrc: the SSRC
*
* Notify of a new SSRC that became validated.
*/
gst_rtp_dec_signals[SIGNAL_ON_SSRC_VALIDATED] =
g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, on_ssrc_validated),
NULL, NULL, gst_rtp_dec_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
G_TYPE_UINT, G_TYPE_UINT);
/**
* GstRTPDec::on-bye-ssrc:
* @rtpbin: the object which received the signal
* @session: the session
* @ssrc: the SSRC
*
* Notify of an SSRC that became inactive because of a BYE packet.
*/
gst_rtp_dec_signals[SIGNAL_ON_BYE_SSRC] =
g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, on_bye_ssrc),
NULL, NULL, gst_rtp_dec_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
G_TYPE_UINT, G_TYPE_UINT);
/**
* GstRTPDec::on-bye-timeout:
* @rtpbin: the object which received the signal
* @session: the session
* @ssrc: the SSRC
*
* Notify of an SSRC that has timed out because of BYE
*/
gst_rtp_dec_signals[SIGNAL_ON_BYE_TIMEOUT] =
g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, on_bye_timeout),
NULL, NULL, gst_rtp_dec_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
G_TYPE_UINT, G_TYPE_UINT);
/**
* GstRTPDec::on-timeout:
* @rtpbin: the object which received the signal
* @session: the session
* @ssrc: the SSRC
*
* Notify of an SSRC that has timed out
*/
gst_rtp_dec_signals[SIGNAL_ON_TIMEOUT] =
g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPDecClass, on_timeout),
NULL, NULL, gst_rtp_dec_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
G_TYPE_UINT, G_TYPE_UINT);
gstelement_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_rtp_dec_provide_clock);
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_dec_change_state);

View file

@ -72,6 +72,13 @@ struct _GstRTPDecClass {
GstCaps* (*request_pt_map) (GstRTPDec *rtpdec, guint session, guint pt);
void (*clear_pt_map) (GstRTPDec *rtpdec);
void (*on_new_ssrc) (GstRTPDec *rtpdec, guint session, guint32 ssrc);
void (*on_ssrc_collision) (GstRTPDec *rtpdec, guint session, guint32 ssrc);
void (*on_ssrc_validated) (GstRTPDec *rtpdec, guint session, guint32 ssrc);
void (*on_bye_ssrc) (GstRTPDec *rtpdec, guint session, guint32 ssrc);
void (*on_bye_timeout) (GstRTPDec *rtpdec, guint session, guint32 ssrc);
void (*on_timeout) (GstRTPDec *rtpdec, guint session, guint32 ssrc);
};
GType gst_rtp_dec_get_type(void);

View file

@ -140,6 +140,7 @@ enum
#define DEFAULT_TIMEOUT 5000000
#define DEFAULT_TCP_TIMEOUT 20000000
#define DEFAULT_LATENCY_MS 3000
#define DEFAULT_CONNECTION_SPEED 0
enum
{
@ -151,6 +152,7 @@ enum
PROP_TIMEOUT,
PROP_TCP_TIMEOUT,
PROP_LATENCY,
PROP_CONNECTION_SPEED
};
#define GST_TYPE_RTSP_LOWER_TRANS (gst_rtsp_lower_trans_get_type())
@ -201,6 +203,8 @@ static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler,
static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src);
static void gst_rtspsrc_loop (GstRTSPSrc * src);
static void gst_rtspsrc_stream_push_event (GstRTSPSrc * src,
GstRTSPStream * stream, GstEvent * event);
static void gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event);
/* commands we send to out loop to notify it of events */
@ -292,6 +296,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
"Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
g_param_spec_uint ("connection-speed", "Connection Speed",
"Network connection speed in kbps (0 = unknown)",
0, G_MAXINT / 1000, DEFAULT_CONNECTION_SPEED,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
gstelement_class->change_state = gst_rtspsrc_change_state;
gstbin_class->handle_message = gst_rtspsrc_handle_message;
@ -376,6 +386,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_LATENCY:
rtspsrc->latency = g_value_get_uint (value);
break;
case PROP_CONNECTION_SPEED:
rtspsrc->connection_speed = g_value_get_uint (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -418,6 +431,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_LATENCY:
g_value_set_uint (value, rtspsrc->latency);
break;
case PROP_CONNECTION_SPEED:
g_value_set_uint (value, rtspsrc->connection_speed);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -487,6 +503,18 @@ find_stream_by_setup (GstRTSPStream * stream, gconstpointer a)
return -1;
}
GstRTSPStream *
find_stream (GstRTSPSrc * src, gconstpointer data, gconstpointer func)
{
GList *lstream;
/* find and get stream */
if ((lstream = g_list_find_custom (src->streams, data, (GCompareFunc) func)))
return (GstRTSPStream *) lstream->data;
return NULL;
}
static GstRTSPStream *
gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx)
{
@ -520,8 +548,7 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx)
/* If we have a dynamic payload type, see if we have a stream with the
* same payload number. If there is one, they are part of the same
* container and we only need to add one pad. */
if (g_list_find_custom (src->streams, GINT_TO_POINTER (stream->pt),
(GCompareFunc) find_stream_by_pt)) {
if (find_stream (src, GINT_TO_POINTER (stream->pt), find_stream_by_pt)) {
stream->container = TRUE;
}
}
@ -532,7 +559,7 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx)
* the RTP-Info header field returned from PLAY. */
control_url = gst_sdp_media_get_attribute_val (media, "control");
GST_DEBUG_OBJECT (src, "stream %d", stream->id);
GST_DEBUG_OBJECT (src, "stream %d, (%p)", stream->id, stream);
GST_DEBUG_OBJECT (src, " pt: %d", stream->pt);
GST_DEBUG_OBJECT (src, " container: %d", stream->container);
GST_DEBUG_OBJECT (src, " caps: %" GST_PTR_FORMAT, stream->caps);
@ -1382,14 +1409,10 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src)
GST_DEBUG_OBJECT (src, "stream: %u, SSRC %d, PT %d", id, ssrc, pt);
lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (id),
(GCompareFunc) find_stream_by_id);
if (lstream == NULL)
stream = find_stream (src, GINT_TO_POINTER (id), find_stream_by_id);
if (stream == NULL)
goto unknown_stream;
/* get stream */
stream = (GstRTSPStream *) lstream->data;
/* create a new pad we will use to stream to */
template = gst_static_pad_template_get (&rtptemplate);
stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
@ -1436,18 +1459,15 @@ static GstCaps *
request_pt_map (GstElement * sess, guint session, guint pt, GstRTSPSrc * src)
{
GstRTSPStream *stream;
GList *lstream;
GstCaps *caps;
GST_DEBUG_OBJECT (src, "getting pt map for pt %d in session %d", pt, session);
GST_RTSP_STATE_LOCK (src);
lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (session),
(GCompareFunc) find_stream_by_id);
if (!lstream)
stream = find_stream (src, GINT_TO_POINTER (session), find_stream_by_id);
if (!stream)
goto unknown_stream;
stream = (GstRTSPStream *) lstream->data;
caps = stream->caps;
GST_RTSP_STATE_UNLOCK (src);
@ -1461,6 +1481,55 @@ unknown_stream:
}
}
static void
gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session)
{
GstRTSPStream *stream;
GST_DEBUG_OBJECT (src, "setting stream for session %u to EOS", session);
/* get stream for session */
stream = find_stream (src, GINT_TO_POINTER (session), find_stream_by_id);
if (!stream)
goto unknown_stream;
if (stream->eos)
goto was_eos;
stream->eos = TRUE;
gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos ());
return;
/* ERRORS */
unknown_stream:
{
GST_DEBUG_OBJECT (src, "unknown stream for session %u", session);
return;
}
was_eos:
{
GST_DEBUG_OBJECT (src, "stream for session %u was EOS already %u", session);
return;
}
}
static void
on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
GstRTSPSrc * src)
{
GST_DEBUG_OBJECT (src, "SSRC %08x in session %u received BYE", ssrc, session);
gst_rtspsrc_do_stream_eos (src, session);
}
static void
on_timeout (GstElement * manager, guint session, guint32 ssrc, GstRTSPSrc * src)
{
GST_DEBUG_OBJECT (src, "SSRC %08x in session %u timed out", ssrc, session);
gst_rtspsrc_do_stream_eos (src, session);
}
/* try to get and configure a manager */
static gboolean
gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
@ -1505,13 +1574,20 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
g_object_set (src->session, "latency", src->latency, NULL);
/* connect to signals if we did not already do so */
GST_DEBUG_OBJECT (src, "connect to signals on session manager");
GST_DEBUG_OBJECT (src, "connect to signals on session manager, stream %p",
stream);
src->session_sig_id =
g_signal_connect (src->session, "pad-added",
(GCallback) new_session_pad, src);
src->session_ptmap_id =
g_signal_connect (src->session, "request-pt-map",
(GCallback) request_pt_map, src);
g_signal_connect (src->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
src);
g_signal_connect (src->session, "on-bye-timeout", (GCallback) on_timeout,
src);
g_signal_connect (src->session, "on-timeout", (GCallback) on_timeout,
src);
}
/* we stream directly to the manager, get some pads. Each RTSP stream goes
@ -2055,6 +2131,34 @@ done:
return ret;
}
static void
gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
GstEvent * event)
{
/* only streams that have a connection to the outside world */
if (stream->srcpad == NULL)
goto done;
if (stream->channelpad[0]) {
gst_event_ref (event);
if (GST_PAD_IS_SRC (stream->channelpad[0]))
gst_pad_push_event (stream->channelpad[0], event);
else
gst_pad_send_event (stream->channelpad[0], event);
}
if (stream->channelpad[1]) {
gst_event_ref (event);
if (GST_PAD_IS_SRC (stream->channelpad[1]))
gst_pad_push_event (stream->channelpad[1], event);
else
gst_pad_send_event (stream->channelpad[1], event);
}
done:
gst_event_unref (event);
}
static void
gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event)
{
@ -2063,25 +2167,8 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event)
for (streams = src->streams; streams; streams = g_list_next (streams)) {
GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
/* only streams that have a connection to the outside world */
if (ostream->srcpad == NULL)
continue;
if (ostream->channelpad[0]) {
gst_event_ref (event);
if (GST_PAD_IS_SRC (ostream->channelpad[0]))
gst_pad_push_event (ostream->channelpad[0], event);
else
gst_pad_send_event (ostream->channelpad[0], event);
}
if (ostream->channelpad[1]) {
gst_event_ref (event);
if (GST_PAD_IS_SRC (ostream->channelpad[1]))
gst_pad_push_event (ostream->channelpad[1], event);
else
gst_pad_send_event (ostream->channelpad[1], event);
}
gst_rtspsrc_stream_push_event (src, ostream, event);
}
gst_event_unref (event);
}
@ -2168,7 +2255,6 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
GstRTSPMessage message = { 0 };
GstRTSPResult res;
gint channel;
GList *lstream;
GstRTSPStream *stream;
GstPad *outpad = NULL;
guint8 *data;
@ -2210,6 +2296,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
/* unset flushing so we can do something else */
gst_rtsp_connection_flush (src->connection, FALSE);
goto interrupt;
case GST_RTSP_ETIMEOUT:
goto timeout;
default:
goto receive_error;
}
@ -2238,12 +2326,10 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
channel = message.type_data.data.channel;
lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (channel),
(GCompareFunc) find_stream_by_channel);
if (!lstream)
stream = find_stream (src, GINT_TO_POINTER (channel), find_stream_by_channel);
if (!stream)
goto unknown_stream;
stream = (GstRTSPStream *) lstream->data;
if (channel == stream->channel[0]) {
outpad = stream->channelpad[0];
is_rtcp = FALSE;
@ -2319,6 +2405,13 @@ unknown_stream:
gst_rtsp_message_unset (&message);
return;
}
timeout:
{
GST_DEBUG_OBJECT (src, "we got a timeout");
gst_rtsp_message_unset (&message);
ret = GST_FLOW_UNEXPECTED;
goto need_pause;
}
interrupt:
{
GST_DEBUG_OBJECT (src, "we got interrupted");
@ -3745,13 +3838,8 @@ gst_rtspsrc_parse_rtpinfo (GstRTSPSrc * src, gchar * rtpinfo)
/* remove leading whitespace */
fields[j] = g_strchug (fields[j]);
if (g_str_has_prefix (fields[j], "url=")) {
GList *lstream;
/* get the url and the stream */
lstream = g_list_find_custom (src->streams, (fields[j] + 4),
(GCompareFunc) find_stream_by_setup);
if (lstream)
stream = (GstRTSPStream *) lstream->data;
stream = find_stream (src, (fields[j] + 4), find_stream_by_setup);
} else if (g_str_has_prefix (fields[j], "seq=")) {
seqbase = atoi (fields[j] + 4);
} else if (g_str_has_prefix (fields[j], "rtptime=")) {
@ -3983,7 +4071,6 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
case GST_MESSAGE_ERROR:
{
GstObject *udpsrc;
GList *lstream;
GstRTSPStream *stream;
GstFlowReturn ret;
@ -3992,13 +4079,10 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
GST_DEBUG_OBJECT (rtspsrc, "got error from %s",
GST_ELEMENT_NAME (udpsrc));
lstream = g_list_find_custom (rtspsrc->streams, udpsrc,
(GCompareFunc) find_stream_by_udpsrc);
if (!lstream)
stream = find_stream (rtspsrc, udpsrc, find_stream_by_udpsrc);
if (!stream)
goto forward;
stream = (GstRTSPStream *) lstream->data;
/* we ignore the RTCP udpsrc */
if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
goto done;
@ -4072,6 +4156,9 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition)
goto done;
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
ret = GST_STATE_CHANGE_SUCCESS;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
ret = GST_STATE_CHANGE_NO_PREROLL;
break;

View file

@ -90,6 +90,7 @@ struct _GstRTSPStream {
GstFlowReturn last_ret;
gboolean added;
gboolean disabled;
gboolean eos;
/* for interleaved mode */
guint8 channel[2];
@ -152,6 +153,7 @@ struct _GstRTSPSrc {
guint64 udp_timeout;
GTimeVal tcp_timeout;
guint latency;
guint connection_speed;
/* state */
GstRTSPState state;