stream: improve join and leave of the pipeline

simplify code
Do the cleanup properly
Add some docs
This commit is contained in:
Wim Taymans 2012-10-26 17:28:10 +02:00
parent 693dd3cfc4
commit 6f7d755894
3 changed files with 212 additions and 191 deletions

View file

@ -72,7 +72,6 @@ static gboolean default_handle_message (GstRTSPMedia * media,
GstMessage * message); GstMessage * message);
static void finish_unprepare (GstRTSPMedia * media); static void finish_unprepare (GstRTSPMedia * media);
static gboolean default_unprepare (GstRTSPMedia * media); static gboolean default_unprepare (GstRTSPMedia * media);
static void unlock_streams (GstRTSPMedia * media);
static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 }; static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 };
@ -895,22 +894,6 @@ weird_type:
} }
} }
static void
unlock_streams (GstRTSPMedia * media)
{
guint i;
/* unlock the udp src elements */
for (i = 0; i < media->streams->len; i++) {
GstRTSPStream *stream;
stream = g_ptr_array_index (media->streams, i);
gst_element_set_locked_state (stream->udpsrc[0], FALSE);
gst_element_set_locked_state (stream->udpsrc[1], FALSE);
}
}
static void static void
gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status) gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status)
{ {
@ -1075,24 +1058,21 @@ static void
pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media) pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media)
{ {
GstRTSPStream *stream; GstRTSPStream *stream;
gint i;
stream = gst_rtsp_media_create_stream (media, element, pad); stream = gst_rtsp_media_create_stream (media, element, pad);
GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad),
stream->idx); stream->idx);
/* we will be adding elements below that will cause ASYNC_DONE to be
* posted in the bus. We want to ignore those messages until the
* pipeline really prerolled. */
media->adding = TRUE; media->adding = TRUE;
gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline), media->rtpbin); /* join the element in the PAUSED state because this callback is
* called from the streaming thread and it is PAUSED */
for (i = 0; i < 2; i++) { gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline),
gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED); media->rtpbin, GST_STATE_PAUSED);
gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED);
gst_element_set_state (stream->appqueue[i], GST_STATE_PAUSED);
gst_element_set_state (stream->tee[i], GST_STATE_PAUSED);
gst_element_set_state (stream->selector[i], GST_STATE_PAUSED);
gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED);
}
media->adding = FALSE; media->adding = FALSE;
} }
@ -1173,7 +1153,8 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
stream = g_ptr_array_index (media->streams, i); stream = g_ptr_array_index (media->streams, i);
gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline), media->rtpbin); gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline),
media->rtpbin, GST_STATE_NULL);
} }
for (walk = media->dynamic; walk; walk = g_list_next (walk)) { for (walk = media->dynamic; walk; walk = g_list_next (walk)) {
@ -1208,7 +1189,7 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
/* we need to go to PLAYING */ /* we need to go to PLAYING */
GST_INFO ("NO_PREROLL state change: live media %p", media); GST_INFO ("NO_PREROLL state change: live media %p", media);
/* FIXME we disable seeking for live streams for now. We should perform a /* FIXME we disable seeking for live streams for now. We should perform a
* seeking query in preroll instead and do a seeking query. */ * seeking query in preroll instead */
media->seekable = FALSE; media->seekable = FALSE;
media->is_live = TRUE; media->is_live = TRUE;
ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
@ -1219,7 +1200,8 @@ gst_rtsp_media_prepare (GstRTSPMedia * media)
goto state_failed; goto state_failed;
} }
/* now wait for all pads to be prerolled */ /* now wait for all pads to be prerolled, FIXME, we should somehow be
* able to do this async so that we don't block the server thread. */
status = gst_rtsp_media_get_status (media); status = gst_rtsp_media_get_status (media);
if (status == GST_RTSP_MEDIA_STATUS_ERROR) if (status == GST_RTSP_MEDIA_STATUS_ERROR)
goto state_failed; goto state_failed;
@ -1262,7 +1244,6 @@ finish_unprepare (GstRTSPMedia * media)
GST_DEBUG ("shutting down"); GST_DEBUG ("shutting down");
unlock_streams (media);
gst_element_set_state (media->pipeline, GST_STATE_NULL); gst_element_set_state (media->pipeline, GST_STATE_NULL);
for (i = 0; i < media->streams->len; i++) { for (i = 0; i < media->streams->len; i++) {
@ -1278,6 +1259,7 @@ finish_unprepare (GstRTSPMedia * media)
g_ptr_array_set_size (media->streams, 0); g_ptr_array_set_size (media->streams, 0);
gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin); gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin);
media->rtpbin = NULL;
gst_object_unref (media->pipeline); gst_object_unref (media->pipeline);
media->pipeline = NULL; media->pipeline = NULL;

View file

@ -68,28 +68,12 @@ gst_rtsp_stream_finalize (GObject * obj)
stream = GST_RTSP_STREAM (obj); stream = GST_RTSP_STREAM (obj);
g_assert (!stream->is_joined); /* we really need to be unjoined now */
g_return_if_fail (!stream->is_joined);
gst_object_unref (stream->payloader); gst_object_unref (stream->payloader);
gst_object_unref (stream->srcpad); gst_object_unref (stream->srcpad);
if (stream->session)
g_object_unref (stream->session);
if (stream->caps)
gst_caps_unref (stream->caps);
if (stream->send_rtp_sink)
gst_object_unref (stream->send_rtp_sink);
if (stream->send_rtp_src)
gst_object_unref (stream->send_rtp_src);
if (stream->send_rtcp_src)
gst_object_unref (stream->send_rtcp_src);
if (stream->recv_rtcp_sink)
gst_object_unref (stream->recv_rtcp_sink);
if (stream->recv_rtp_sink)
gst_object_unref (stream->recv_rtp_sink);
g_list_free (stream->transports); g_list_free (stream->transports);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
@ -565,16 +549,18 @@ static GstAppSinkCallbacks sink_cb = {
* @stream: a #GstRTSPStream * @stream: a #GstRTSPStream
* @bin: a #GstBin to join * @bin: a #GstBin to join
* @rtpbin: a rtpbin element in @bin * @rtpbin: a rtpbin element in @bin
* @state: the target state of the new elements
* *
* Join the #Gstbin @bin that contains the element @rtpbin. * Join the #Gstbin @bin that contains the element @rtpbin.
* *
* @stream will link to @rtpbin, which must be inside @bin. * @stream will link to @rtpbin, which must be inside @bin. The elements
* added to @bin will be set to the state given in @state.
* *
* Returns: %TRUE on success. * Returns: %TRUE on success.
*/ */
gboolean gboolean
gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
GstElement * rtpbin) GstElement * rtpbin, GstState state)
{ {
gint i, idx; gint i, idx;
gchar *name; gchar *name;
@ -585,52 +571,42 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
g_return_val_if_fail (GST_IS_BIN (bin), FALSE); g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
idx = stream->idx;
if (stream->is_joined) if (stream->is_joined)
return TRUE; return TRUE;
GST_INFO ("stream %p joining bin", stream); /* create a session with the same index as the stream */
idx = stream->idx;
GST_INFO ("stream %p joining bin as session %d", stream, idx);
if (!alloc_ports (stream)) if (!alloc_ports (stream))
goto no_ports; goto no_ports;
/* add the ports to the pipeline */ /* get a pad for sending RTP */
for (i = 0; i < 2; i++) {
gst_bin_add (bin, stream->udpsink[i]);
gst_bin_add (bin, stream->udpsrc[i]);
}
/* create elements for the TCP transfer */
for (i = 0; i < 2; i++) {
stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
gst_bin_add (bin, stream->appqueue[i]);
gst_bin_add (bin, stream->appsink[i]);
gst_bin_add (bin, stream->appsrc[i]);
gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
&sink_cb, stream, NULL);
}
/* hook up the stream to the RTP session elements. */
name = g_strdup_printf ("send_rtp_sink_%u", idx); name = g_strdup_printf ("send_rtp_sink_%u", idx);
stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
g_free (name); g_free (name);
/* link the RTP pad to the session manager, it should not really fail unless
* this is not really an RTP pad */
ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
if (ret != GST_PAD_LINK_OK)
goto link_failed;
/* get pads from the RTP session element for sending and receiving
* RTP/RTCP*/
name = g_strdup_printf ("send_rtp_src_%u", idx); name = g_strdup_printf ("send_rtp_src_%u", idx);
stream->send_rtp_src = gst_element_get_static_pad (rtpbin, name); stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
g_free (name); g_free (name);
name = g_strdup_printf ("send_rtcp_src_%u", idx); name = g_strdup_printf ("send_rtcp_src_%u", idx);
stream->send_rtcp_src = gst_element_get_request_pad (rtpbin, name); stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
g_free (name);
name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
stream->recv_rtcp_sink = gst_element_get_request_pad (rtpbin, name);
g_free (name); g_free (name);
name = g_strdup_printf ("recv_rtp_sink_%u", idx); name = g_strdup_printf ("recv_rtp_sink_%u", idx);
stream->recv_rtp_sink = gst_element_get_request_pad (rtpbin, name); stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
g_free (name); g_free (name);
name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
g_free (name);
/* get the session */ /* get the session */
g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session); g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
@ -647,110 +623,119 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout, g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
stream); stream);
/* link the RTP pad to the session manager */ for (i = 0; i < 2; i++) {
ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink); /* For the sender we create this bit of pipeline for both
if (ret != GST_PAD_LINK_OK) * RTP and RTCP. Sync and preroll are enabled on udpsink so
goto link_failed; * we need to add a queue before appsink to make the pipeline
* not block. For the TCP case, we want to pump data to the
* client as fast as possible anyway.
*
* .--------. .-----. .---------.
* | rtpbin | | tee | | udpsink |
* | send->sink src->sink |
* '--------' | | '---------'
* | | .---------. .---------.
* | | | queue | | appsink |
* | src->sink src->sink |
* '-----' '---------' '---------'
*/
/* make tee for RTP/RTCP */
stream->tee[i] = gst_element_factory_make ("tee", NULL);
gst_bin_add (bin, stream->tee[i]);
/* make tee for RTP and link to stream */ /* and link to rtpbin send pad */
stream->tee[0] = gst_element_factory_make ("tee", NULL); pad = gst_element_get_static_pad (stream->tee[i], "sink");
gst_bin_add (bin, stream->tee[0]); gst_pad_link (stream->send_src[i], pad);
pad = gst_element_get_static_pad (stream->tee[0], "sink");
gst_pad_link (stream->send_rtp_src, pad);
gst_object_unref (pad); gst_object_unref (pad);
/* link RTP sink, we're pretty sure this will work. */ /* add udpsink */
teepad = gst_element_get_request_pad (stream->tee[0], "src_%u"); gst_bin_add (bin, stream->udpsink[i]);
pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
/* link tee to udpsink */
teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
gst_pad_link (teepad, pad); gst_pad_link (teepad, pad);
gst_object_unref (pad); gst_object_unref (pad);
gst_object_unref (teepad); gst_object_unref (teepad);
teepad = gst_element_get_request_pad (stream->tee[0], "src_%u"); /* make queue */
pad = gst_element_get_static_pad (stream->appqueue[0], "sink"); stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
gst_bin_add (bin, stream->appqueue[i]);
/* and link to tee */
teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
gst_pad_link (teepad, pad); gst_pad_link (teepad, pad);
gst_object_unref (pad); gst_object_unref (pad);
gst_object_unref (teepad); gst_object_unref (teepad);
queuepad = gst_element_get_static_pad (stream->appqueue[0], "src"); /* make appsink */
pad = gst_element_get_static_pad (stream->appsink[0], "sink"); stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
gst_bin_add (bin, stream->appsink[i]);
gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
&sink_cb, stream, NULL);
/* and link to queue */
queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
pad = gst_element_get_static_pad (stream->appsink[i], "sink");
gst_pad_link (queuepad, pad); gst_pad_link (queuepad, pad);
gst_object_unref (pad); gst_object_unref (pad);
gst_object_unref (queuepad); gst_object_unref (queuepad);
/* make tee for RTCP */ /* For the receiver we create this bit of pipeline for both
stream->tee[1] = gst_element_factory_make ("tee", NULL); * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
gst_bin_add (bin, stream->tee[1]); * and it is all funneled into the rtpbin receive pad.
*
* .--------. .--------. .--------.
* | udpsrc | | funnel | | rtpbin |
* | src->sink src->sink |
* '--------' | | '--------'
* .--------. | |
* | appsrc | | |
* | src->sink |
* '--------' '--------'
*/
/* make funnel for the RTP/RTCP receivers */
stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
gst_bin_add (bin, stream->funnel[i]);
pad = gst_element_get_static_pad (stream->tee[1], "sink"); pad = gst_element_get_static_pad (stream->funnel[i], "src");
gst_pad_link (stream->send_rtcp_src, pad); gst_pad_link (pad, stream->recv_sink[i]);
gst_object_unref (pad); gst_object_unref (pad);
/* link RTCP elements */ /* add udpsrc */
teepad = gst_element_get_request_pad (stream->tee[1], "src_%u"); gst_bin_add (bin, stream->udpsrc[i]);
pad = gst_element_get_static_pad (stream->udpsink[1], "sink"); /* and link to the funnel */
gst_pad_link (teepad, pad); selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
gst_object_unref (pad); pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
gst_object_unref (teepad);
teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
gst_pad_link (teepad, pad);
gst_object_unref (pad);
gst_object_unref (teepad);
queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
pad = gst_element_get_static_pad (stream->appsink[1], "sink");
gst_pad_link (queuepad, pad);
gst_object_unref (pad);
gst_object_unref (queuepad);
/* make selector for the RTP receivers */
stream->selector[0] = gst_element_factory_make ("funnel", NULL);
gst_bin_add (bin, stream->selector[0]);
pad = gst_element_get_static_pad (stream->selector[0], "src");
gst_pad_link (pad, stream->recv_rtp_sink);
gst_object_unref (pad);
selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
gst_pad_link (pad, selpad);
gst_object_unref (pad);
gst_object_unref (selpad);
selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
pad = gst_element_get_static_pad (stream->appsrc[0], "src");
gst_pad_link (pad, selpad); gst_pad_link (pad, selpad);
gst_object_unref (pad); gst_object_unref (pad);
gst_object_unref (selpad); gst_object_unref (selpad);
/* make selector for the RTCP receivers */ /* make and add appsrc */
stream->selector[1] = gst_element_factory_make ("funnel", NULL); stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
gst_bin_add (bin, stream->selector[1]); gst_bin_add (bin, stream->appsrc[i]);
/* and link to the funnel */
pad = gst_element_get_static_pad (stream->selector[1], "src"); selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
gst_pad_link (pad, stream->recv_rtcp_sink); pad = gst_element_get_static_pad (stream->appsrc[i], "src");
gst_object_unref (pad);
selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
gst_pad_link (pad, selpad);
gst_object_unref (pad);
gst_object_unref (selpad);
selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
pad = gst_element_get_static_pad (stream->appsrc[1], "src");
gst_pad_link (pad, selpad); gst_pad_link (pad, selpad);
gst_object_unref (pad); gst_object_unref (pad);
gst_object_unref (selpad); gst_object_unref (selpad);
/* check if we need to set to a special state */
if (state != GST_STATE_NULL) {
gst_element_set_state (stream->udpsink[i], state);
gst_element_set_state (stream->appsink[i], state);
gst_element_set_state (stream->appqueue[i], state);
gst_element_set_state (stream->tee[i], state);
gst_element_set_state (stream->funnel[i], state);
gst_element_set_state (stream->appsrc[i], state);
}
/* we set and keep these to playing so that they don't cause NO_PREROLL return /* we set and keep these to playing so that they don't cause NO_PREROLL return
* values */ * values */
gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING); gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING); gst_element_set_locked_state (stream->udpsrc[i], TRUE);
gst_element_set_locked_state (stream->udpsrc[0], TRUE); }
gst_element_set_locked_state (stream->udpsrc[1], TRUE);
/* be notified of caps changes */ /* be notified of caps changes */
stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps", stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
@ -769,6 +754,8 @@ no_ports:
link_failed: link_failed:
{ {
GST_WARNING ("failed to link stream %d", idx); GST_WARNING ("failed to link stream %d", idx);
gst_object_unref (stream->send_rtp_sink);
stream->send_rtp_sink = NULL;
return FALSE; return FALSE;
} }
} }
@ -779,7 +766,8 @@ link_failed:
* @bin: a #GstBin * @bin: a #GstBin
* @rtpbin: a rtpbin #GstElement * @rtpbin: a rtpbin #GstElement
* *
* Remove the elements of @stream from the bin * Remove the elements of @stream from @bin. @bin must be set
* to the NULL state before calling this.
* *
* Return: %TRUE on success. * Return: %TRUE on success.
*/ */
@ -796,22 +784,55 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
if (!stream->is_joined) if (!stream->is_joined)
return TRUE; return TRUE;
/* all transports must be removed by now */
g_return_val_if_fail (stream->transports == NULL, FALSE);
GST_INFO ("stream %p leaving bin", stream); GST_INFO ("stream %p leaving bin", stream);
gst_pad_unlink (stream->srcpad, stream->send_rtp_sink); gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig); g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
gst_object_unref (stream->send_rtp_sink);
stream->send_rtp_sink = NULL;
/* FIXME not entirely the opposite of join_bin */
for (i = 0; i < 2; i++) { for (i = 0; i < 2; i++) {
/* and set udpsrc to NULL now before removing */
gst_element_set_locked_state (stream->udpsrc[i], FALSE);
gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
/* removing them should also nicely release the request
* pads when they finalize */
gst_bin_remove (bin, stream->udpsrc[i]); gst_bin_remove (bin, stream->udpsrc[i]);
gst_bin_remove (bin, stream->udpsink[i]); gst_bin_remove (bin, stream->udpsink[i]);
gst_bin_remove (bin, stream->appsrc[i]); gst_bin_remove (bin, stream->appsrc[i]);
gst_bin_remove (bin, stream->appsink[i]); gst_bin_remove (bin, stream->appsink[i]);
gst_bin_remove (bin, stream->appqueue[i]); gst_bin_remove (bin, stream->appqueue[i]);
gst_bin_remove (bin, stream->tee[i]); gst_bin_remove (bin, stream->tee[i]);
gst_bin_remove (bin, stream->selector[i]); gst_bin_remove (bin, stream->funnel[i]);
gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
gst_object_unref (stream->recv_sink[i]);
stream->recv_sink[i] = NULL;
stream->udpsrc[i] = NULL;
stream->udpsink[i] = NULL;
stream->appsrc[i] = NULL;
stream->appsink[i] = NULL;
stream->appqueue[i] = NULL;
stream->tee[i] = NULL;
stream->funnel[i] = NULL;
} }
gst_object_unref (stream->send_src[0]);
stream->send_src[0] = NULL;
gst_element_release_request_pad (rtpbin, stream->send_src[1]);
gst_object_unref (stream->send_src[1]);
stream->send_src[1] = NULL;
g_object_unref (stream->session);
if (stream->caps)
gst_caps_unref (stream->caps);
stream->is_joined = FALSE; stream->is_joined = FALSE;
return TRUE; return TRUE;
@ -862,6 +883,10 @@ gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
{ {
GstFlowReturn ret; GstFlowReturn ret;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (stream->is_joined, FALSE);
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer); ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
return ret; return ret;
@ -884,6 +909,10 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
{ {
GstFlowReturn ret; GstFlowReturn ret;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
g_return_val_if_fail (stream->is_joined, FALSE);
ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer); ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
return ret; return ret;
@ -969,6 +998,8 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
* Add the transport in @trans to @stream. The media of @stream will * Add the transport in @trans to @stream. The media of @stream will
* then also be send to the values configured in @trans. * then also be send to the values configured in @trans.
* *
* @stream must be joined to a bin.
*
* @trans must contain a valid #GstRTSPTransport. * @trans must contain a valid #GstRTSPTransport.
* *
* Returns: %TRUE if @trans was added * Returns: %TRUE if @trans was added
@ -979,6 +1010,7 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream,
{ {
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE); g_return_val_if_fail (trans->transport != NULL, FALSE);
return update_transport (stream, trans, TRUE); return update_transport (stream, trans, TRUE);
@ -992,6 +1024,10 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream,
* Remove the transport in @trans from @stream. The media of @stream will * Remove the transport in @trans from @stream. The media of @stream will
* not be sent to the values configured in @trans. * not be sent to the values configured in @trans.
* *
* @stream must be joined to a bin.
*
* @trans must contain a valid #GstRTSPTransport.
*
* Returns: %TRUE if @trans was removed * Returns: %TRUE if @trans was removed
*/ */
gboolean gboolean
@ -1000,6 +1036,7 @@ gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
{ {
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
g_return_val_if_fail (stream->is_joined, FALSE);
g_return_val_if_fail (trans->transport != NULL, FALSE); g_return_val_if_fail (trans->transport != NULL, FALSE);
return update_transport (stream, trans, FALSE); return update_transport (stream, trans, FALSE);

View file

@ -50,21 +50,24 @@ typedef struct _GstRTSPStreamClass GstRTSPStreamClass;
* @is_ipv6: should this stream be IPv6 * @is_ipv6: should this stream be IPv6
* @buffer_size: the UDP buffer size * @buffer_size: the UDP buffer size
* @is_joined: if the stream is joined in a bin * @is_joined: if the stream is joined in a bin
* @recv_rtp_sink: sinkpad for RTP buffers * @send_rtp_sink: sinkpad for sending RTP buffers
* @recv_rtcp_sink: sinkpad for RTCP buffers * @recv_sink: sinkpad for receiving RTP/RTCP buffers
* @send_rtp_src: srcpad for RTP buffers * @send_src: srcpad for sending RTP/RTCP buffers
* @send_rtcp_src: srcpad for RTCP buffers * @session: the RTP session object
* @udpsrc: the udp source elements for RTP/RTCP * @udpsrc: the udp source elements for RTP/RTCP
* @udpsink: the udp sink elements for RTP/RTCP * @udpsink: the udp sink elements for RTP/RTCP
* @appsrc: the app source elements for RTP/RTCP * @appsrc: the app source elements for RTP/RTCP
* @appqueue: the app queue elements for RTP/RTCP
* @appsink: the app sink elements for RTP/RTCP * @appsink: the app sink elements for RTP/RTCP
* @tee: tee for the sending to udpsink and appsink
* @funnel: tee for the receiving from udpsrc and appsrc
* @server_port: the server ports for this stream * @server_port: the server ports for this stream
* @caps_sig: the signal id for detecting caps * @caps_sig: the signal id for detecting caps
* @caps: the caps of the stream * @caps: the caps of the stream
* @n_active: the number of active transports in @transports * @n_active: the number of active transports in @transports
* @transports: list of #GstStreamTransport being streamed to * @transports: list of #GstStreamTransport being streamed to
* *
* The definition of a media stream. The streams are identified by @id. * The definition of a media stream. The streams are identified by @idx.
*/ */
struct _GstRTSPStream { struct _GstRTSPStream {
GObject parent; GObject parent;
@ -77,11 +80,9 @@ struct _GstRTSPStream {
gboolean is_joined; gboolean is_joined;
/* pads on the rtpbin */ /* pads on the rtpbin */
GstPad *recv_rtcp_sink;
GstPad *recv_rtp_sink;
GstPad *send_rtp_sink; GstPad *send_rtp_sink;
GstPad *send_rtp_src; GstPad *recv_sink[2];
GstPad *send_rtcp_src; GstPad *send_src[2];
/* the RTPSession object */ /* the RTPSession object */
GObject *session; GObject *session;
@ -96,7 +97,7 @@ struct _GstRTSPStream {
GstElement *appsink[2]; GstElement *appsink[2];
GstElement *tee[2]; GstElement *tee[2];
GstElement *selector[2]; GstElement *funnel[2];
/* server ports for sending/receiving */ /* server ports for sending/receiving */
GstRTSPRange server_port; GstRTSPRange server_port;
@ -123,7 +124,8 @@ void gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guin
guint gst_rtsp_stream_get_mtu (GstRTSPStream * stream); guint gst_rtsp_stream_get_mtu (GstRTSPStream * stream);
gboolean gst_rtsp_stream_join_bin (GstRTSPStream * stream, gboolean gst_rtsp_stream_join_bin (GstRTSPStream * stream,
GstBin *bin, GstElement *rtpbin); GstBin *bin, GstElement *rtpbin,
GstState state);
gboolean gst_rtsp_stream_leave_bin (GstRTSPStream * stream, gboolean gst_rtsp_stream_leave_bin (GstRTSPStream * stream,
GstBin *bin, GstElement *rtpbin); GstBin *bin, GstElement *rtpbin);