/* GStreamer * Copyright (C) 2008 Wim Taymans * Copyright (C) 2015 Centricular Ltd * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /** * SECTION:rtsp-stream * @short_description: A media stream * @see_also: #GstRTSPMedia * * The #GstRTSPStream object manages the data transport for one stream. It * is created from a payloader element and a source pad that produce the RTP * packets for the stream. * * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again. * * The #GstRTSPStream will use the configured addresspool, as set with * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the * stream. With gst_rtsp_stream_get_multicast_address() you can get the * configured address. * * With gst_rtsp_stream_get_server_port () you can get the port that the server * will use to receive RTCP. This is the part that the clients will use to send * RTCP to. * * With gst_rtsp_stream_add_transport() destinations can be added where the * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove * the destination again. * * Last reviewed on 2013-07-16 (1.0.0) */ #include #include #include #include #include #include #include #include "rtsp-stream.h" #define GST_RTSP_STREAM_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate)) struct _GstRTSPStreamPrivate { GMutex lock; guint idx; /* Only one pad is ever set */ GstPad *srcpad, *sinkpad; GstElement *payloader; guint buffer_size; GstBin *joined_bin; /* TRUE if this stream is running on * the client side of an RTSP link (for RECORD) */ gboolean client_side; gchar *control; /* TRUE if stream is complete. This means that the receiver and the sender * parts are present in the stream. */ gboolean is_complete; GstRTSPProfile profiles; GstRTSPLowerTrans protocols; /* pads on the rtpbin */ GstPad *send_rtp_sink; GstPad *recv_rtp_src; GstPad *recv_sink[2]; GstPad *send_src[2]; /* the RTPSession object */ GObject *session; /* SRTP encoder/decoder */ GstElement *srtpenc; GstElement *srtpdec; GHashTable *keys; /* for UDP unicast */ GstElement *udpsrc_v4[2]; GstElement *udpsrc_v6[2]; GstElement *udpqueue[2]; GstElement *udpsink[2]; GSocket *socket_v4[2]; GSocket *socket_v6[2]; /* for UDP multicast */ GstElement *mcast_udpsrc_v4[2]; GstElement *mcast_udpsrc_v6[2]; GstElement *mcast_udpqueue[2]; GstElement *mcast_udpsink[2]; GSocket *mcast_socket_v4[2]; GSocket *mcast_socket_v6[2]; /* for TCP transport */ GstElement *appsrc[2]; GstClockTime appsrc_base_time[2]; GstElement *appqueue[2]; GstElement *appsink[2]; GstElement *tee[2]; GstElement *funnel[2]; /* retransmission */ GstElement *rtxsend; guint rtx_pt; GstClockTime rtx_time; /* pool used to manage unicast and multicast addresses */ GstRTSPAddressPool *pool; /* unicast server addr/port */ GstRTSPAddress *server_addr_v4; GstRTSPAddress *server_addr_v6; /* multicast addresses */ GstRTSPAddress *mcast_addr_v4; GstRTSPAddress *mcast_addr_v6; gchar *multicast_iface; /* the caps of the stream */ gulong caps_sig; GstCaps *caps; /* transports we stream to */ guint n_active; GList *transports; guint transports_cookie; GList *tr_cache_rtp; GList *tr_cache_rtcp; guint tr_cache_cookie_rtp; guint tr_cache_cookie_rtcp; gint dscp_qos; /* stream blocking */ gulong blocked_id[2]; gboolean blocking; /* current stream postion */ GstClockTime position; /* pt->caps map for RECORD streams */ GHashTable *ptmap; GstRTSPPublishClockMode publish_clock_mode; }; #define DEFAULT_CONTROL NULL #define DEFAULT_PROFILES GST_RTSP_PROFILE_AVP #define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \ GST_RTSP_LOWER_TRANS_TCP enum { PROP_0, PROP_CONTROL, PROP_PROFILES, PROP_PROTOCOLS, PROP_LAST }; enum { SIGNAL_NEW_RTP_ENCODER, SIGNAL_NEW_RTCP_ENCODER, SIGNAL_LAST }; GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug); #define GST_CAT_DEFAULT rtsp_stream_debug static GQuark ssrc_stream_map_key; static void gst_rtsp_stream_get_property (GObject * object, guint propid, GValue * value, GParamSpec * pspec); static void gst_rtsp_stream_set_property (GObject * object, guint propid, const GValue * value, GParamSpec * pspec); static void gst_rtsp_stream_finalize (GObject * obj); static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 }; G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT); static void gst_rtsp_stream_class_init (GstRTSPStreamClass * klass) { GObjectClass *gobject_class; g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate)); gobject_class = G_OBJECT_CLASS (klass); gobject_class->get_property = gst_rtsp_stream_get_property; gobject_class->set_property = gst_rtsp_stream_set_property; gobject_class->finalize = gst_rtsp_stream_finalize; g_object_class_install_property (gobject_class, PROP_CONTROL, g_param_spec_string ("control", "Control", "The control string for this stream", DEFAULT_CONTROL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PROFILES, g_param_spec_flags ("profiles", "Profiles", "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE, DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PROTOCOLS, g_param_spec_flags ("protocols", "Protocols", "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS, DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] = g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT); gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] = g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT); GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream"); ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream"); } static void gst_rtsp_stream_init (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream); GST_DEBUG ("new stream %p", stream); stream->priv = priv; priv->dscp_qos = -1; priv->control = g_strdup (DEFAULT_CONTROL); priv->profiles = DEFAULT_PROFILES; priv->protocols = DEFAULT_PROTOCOLS; priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK; g_mutex_init (&priv->lock); priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_caps_unref); priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) gst_caps_unref); } static void gst_rtsp_stream_finalize (GObject * obj) { GstRTSPStream *stream; GstRTSPStreamPrivate *priv; guint i; stream = GST_RTSP_STREAM (obj); priv = stream->priv; GST_DEBUG ("finalize stream %p", stream); /* we really need to be unjoined now */ g_return_if_fail (priv->joined_bin == NULL); if (priv->mcast_addr_v4) gst_rtsp_address_free (priv->mcast_addr_v4); if (priv->mcast_addr_v6) gst_rtsp_address_free (priv->mcast_addr_v6); if (priv->server_addr_v4) gst_rtsp_address_free (priv->server_addr_v4); if (priv->server_addr_v6) gst_rtsp_address_free (priv->server_addr_v6); if (priv->pool) g_object_unref (priv->pool); if (priv->rtxsend) g_object_unref (priv->rtxsend); for (i = 0; i < 2; i++) { if (priv->socket_v4[i]) g_object_unref (priv->socket_v4[i]); if (priv->socket_v6[i]) g_object_unref (priv->socket_v6[i]); if (priv->mcast_socket_v4[i]) g_object_unref (priv->mcast_socket_v4[i]); if (priv->mcast_socket_v6[i]) g_object_unref (priv->mcast_socket_v6[i]); } g_free (priv->multicast_iface); gst_object_unref (priv->payloader); if (priv->srcpad) gst_object_unref (priv->srcpad); if (priv->sinkpad) gst_object_unref (priv->sinkpad); g_free (priv->control); g_mutex_clear (&priv->lock); g_hash_table_unref (priv->keys); g_hash_table_destroy (priv->ptmap); G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } static void gst_rtsp_stream_get_property (GObject * object, guint propid, GValue * value, GParamSpec * pspec) { GstRTSPStream *stream = GST_RTSP_STREAM (object); switch (propid) { case PROP_CONTROL: g_value_take_string (value, gst_rtsp_stream_get_control (stream)); break; case PROP_PROFILES: g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream)); break; case PROP_PROTOCOLS: g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); } } static void gst_rtsp_stream_set_property (GObject * object, guint propid, const GValue * value, GParamSpec * pspec) { GstRTSPStream *stream = GST_RTSP_STREAM (object); switch (propid) { case PROP_CONTROL: gst_rtsp_stream_set_control (stream, g_value_get_string (value)); break; case PROP_PROFILES: gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value)); break; case PROP_PROTOCOLS: gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec); } } /** * gst_rtsp_stream_new: * @idx: an index * @pad: a #GstPad * @payloader: a #GstElement * * Create a new media stream with index @idx that handles RTP data on * @pad and has a payloader element @payloader if @pad is a source pad * or a depayloader element @payloader if @pad is a sink pad. * * Returns: (transfer full): a new #GstRTSPStream */ GstRTSPStream * gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad) { GstRTSPStreamPrivate *priv; GstRTSPStream *stream; g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL); priv = stream->priv; priv->idx = idx; priv->payloader = gst_object_ref (payloader); if (GST_PAD_IS_SRC (pad)) priv->srcpad = gst_object_ref (pad); else priv->sinkpad = gst_object_ref (pad); return stream; } /** * gst_rtsp_stream_get_index: * @stream: a #GstRTSPStream * * Get the stream index. * * Return: the stream index. */ guint gst_rtsp_stream_get_index (GstRTSPStream * stream) { g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1); return stream->priv->idx; } /** * gst_rtsp_stream_get_pt: * @stream: a #GstRTSPStream * * Get the stream payload type. * * Return: the stream payload type. */ guint gst_rtsp_stream_get_pt (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; guint pt; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1); priv = stream->priv; g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL); return pt; } /** * gst_rtsp_stream_get_srcpad: * @stream: a #GstRTSPStream * * Get the srcpad associated with @stream. * * Returns: (transfer full): the srcpad. Unref after usage. */ GstPad * gst_rtsp_stream_get_srcpad (GstRTSPStream * stream) { g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); if (!stream->priv->srcpad) return NULL; return gst_object_ref (stream->priv->srcpad); } /** * gst_rtsp_stream_get_sinkpad: * @stream: a #GstRTSPStream * * Get the sinkpad associated with @stream. * * Returns: (transfer full): the sinkpad. Unref after usage. */ GstPad * gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream) { g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); if (!stream->priv->sinkpad) return NULL; return gst_object_ref (stream->priv->sinkpad); } /** * gst_rtsp_stream_get_control: * @stream: a #GstRTSPStream * * Get the control string to identify this stream. * * Returns: (transfer full): the control string. g_free() after usage. */ gchar * gst_rtsp_stream_get_control (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; gchar *result; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&priv->lock); if ((result = g_strdup (priv->control)) == NULL) result = g_strdup_printf ("stream=%u", priv->idx); g_mutex_unlock (&priv->lock); return result; } /** * gst_rtsp_stream_set_control: * @stream: a #GstRTSPStream * @control: a control string * * Set the control string in @stream. */ void gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_mutex_lock (&priv->lock); g_free (priv->control); priv->control = g_strdup (control); g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_has_control: * @stream: a #GstRTSPStream * @control: a control string * * Check if @stream has the control string @control. * * Returns: %TRUE is @stream has @control as the control string */ gboolean gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control) { GstRTSPStreamPrivate *priv; gboolean res; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); if (priv->control) res = (g_strcmp0 (priv->control, control) == 0); else { guint streamid; if (sscanf (control, "stream=%u", &streamid) > 0) res = (streamid == priv->idx); else res = FALSE; } g_mutex_unlock (&priv->lock); return res; } /** * gst_rtsp_stream_set_mtu: * @stream: a #GstRTSPStream * @mtu: a new MTU * * Configure the mtu in the payloader of @stream to @mtu. */ void gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; GST_LOG_OBJECT (stream, "set MTU %u", mtu); g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL); } /** * gst_rtsp_stream_get_mtu: * @stream: a #GstRTSPStream * * Get the configured MTU in the payloader of @stream. * * Returns: the MTU of the payloader. */ guint gst_rtsp_stream_get_mtu (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; guint mtu; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); priv = stream->priv; g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL); return mtu; } /* Update the dscp qos property on the udp sinks */ static void update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink) { GstRTSPStreamPrivate *priv; priv = stream->priv; if (*udpsink) { g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL); } } /** * gst_rtsp_stream_set_dscp_qos: * @stream: a #GstRTSPStream * @dscp_qos: a new dscp qos value (0-63, or -1 to disable) * * Configure the dscp qos of the outgoing sockets to @dscp_qos. */ void gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos); if (dscp_qos < -1 || dscp_qos > 63) { GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos); return; } priv->dscp_qos = dscp_qos; update_dscp_qos (stream, priv->udpsink); } /** * gst_rtsp_stream_get_dscp_qos: * @stream: a #GstRTSPStream * * Get the configured DSCP QoS in of the outgoing sockets. * * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled. */ gint gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1); priv = stream->priv; return priv->dscp_qos; } /** * gst_rtsp_stream_is_transport_supported: * @stream: a #GstRTSPStream * @transport: (transfer none): a #GstRTSPTransport * * Check if @transport can be handled by stream * * Returns: %TRUE if @transport can be handled by @stream. */ gboolean gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream, GstRTSPTransport * transport) { GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); if (transport->trans != GST_RTSP_TRANS_RTP) goto unsupported_transmode; if (!(transport->profile & priv->profiles)) goto unsupported_profile; if (!(transport->lower_transport & priv->protocols)) goto unsupported_ltrans; g_mutex_unlock (&priv->lock); return TRUE; /* ERRORS */ unsupported_transmode: { GST_DEBUG ("unsupported transport mode %d", transport->trans); g_mutex_unlock (&priv->lock); return FALSE; } unsupported_profile: { GST_DEBUG ("unsupported profile %d", transport->profile); g_mutex_unlock (&priv->lock); return FALSE; } unsupported_ltrans: { GST_DEBUG ("unsupported lower transport %d", transport->lower_transport); g_mutex_unlock (&priv->lock); return FALSE; } } /** * gst_rtsp_stream_set_profiles: * @stream: a #GstRTSPStream * @profiles: the new profiles * * Configure the allowed profiles for @stream. */ void gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_mutex_lock (&priv->lock); priv->profiles = profiles; g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_get_profiles: * @stream: a #GstRTSPStream * * Get the allowed profiles of @stream. * * Returns: a #GstRTSPProfile */ GstRTSPProfile gst_rtsp_stream_get_profiles (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstRTSPProfile res; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN); priv = stream->priv; g_mutex_lock (&priv->lock); res = priv->profiles; g_mutex_unlock (&priv->lock); return res; } /** * gst_rtsp_stream_set_protocols: * @stream: a #GstRTSPStream * @protocols: the new flags * * Configure the allowed lower transport for @stream. */ void gst_rtsp_stream_set_protocols (GstRTSPStream * stream, GstRTSPLowerTrans protocols) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_mutex_lock (&priv->lock); priv->protocols = protocols; g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_get_protocols: * @stream: a #GstRTSPStream * * Get the allowed protocols of @stream. * * Returns: a #GstRTSPLowerTrans */ GstRTSPLowerTrans gst_rtsp_stream_get_protocols (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstRTSPLowerTrans res; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_LOWER_TRANS_UNKNOWN); priv = stream->priv; g_mutex_lock (&priv->lock); res = priv->protocols; g_mutex_unlock (&priv->lock); return res; } /** * gst_rtsp_stream_set_address_pool: * @stream: a #GstRTSPStream * @pool: (transfer none): a #GstRTSPAddressPool * * configure @pool to be used as the address pool of @stream. */ void gst_rtsp_stream_set_address_pool (GstRTSPStream * stream, GstRTSPAddressPool * pool) { GstRTSPStreamPrivate *priv; GstRTSPAddressPool *old; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; GST_LOG_OBJECT (stream, "set address pool %p", pool); g_mutex_lock (&priv->lock); if ((old = priv->pool) != pool) priv->pool = pool ? g_object_ref (pool) : NULL; else old = NULL; g_mutex_unlock (&priv->lock); if (old) g_object_unref (old); } /** * gst_rtsp_stream_get_address_pool: * @stream: a #GstRTSPStream * * Get the #GstRTSPAddressPool used as the address pool of @stream. * * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after * usage. */ GstRTSPAddressPool * gst_rtsp_stream_get_address_pool (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstRTSPAddressPool *result; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&priv->lock); if ((result = priv->pool)) g_object_ref (result); g_mutex_unlock (&priv->lock); return result; } /** * gst_rtsp_stream_set_multicast_iface: * @stream: a #GstRTSPStream * @multicast_iface: (transfer none): a multicast interface name * * configure @multicast_iface to be used for @stream. */ void gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream, const gchar * multicast_iface) { GstRTSPStreamPrivate *priv; gchar *old; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; GST_LOG_OBJECT (stream, "set multicast iface %s", GST_STR_NULL (multicast_iface)); g_mutex_lock (&priv->lock); if ((old = priv->multicast_iface) != multicast_iface) priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL; else old = NULL; g_mutex_unlock (&priv->lock); if (old) g_free (old); } /** * gst_rtsp_stream_get_multicast_iface: * @stream: a #GstRTSPStream * * Get the multicast interface used for @stream. * * Returns: (transfer full): the multicast interface for @stream. g_free() after * usage. */ gchar * gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; gchar *result; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&priv->lock); if ((result = priv->multicast_iface)) result = g_strdup (result); g_mutex_unlock (&priv->lock); return result; } /** * gst_rtsp_stream_get_multicast_address: * @stream: a #GstRTSPStream * @family: the #GSocketFamily * * Get the multicast address of @stream for @family. The original * #GstRTSPAddress is cached and copy is returned, so freeing the return value * won't release the address from the pool. * * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream * or %NULL when no address could be allocated. gst_rtsp_address_free() * after usage. */ GstRTSPAddress * gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv; GstRTSPAddress *result; GstRTSPAddress **addrp; GstRTSPAddressFlags flags; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&stream->priv->lock); if (family == G_SOCKET_FAMILY_IPV6) { flags = GST_RTSP_ADDRESS_FLAG_IPV6; addrp = &priv->mcast_addr_v6; } else { flags = GST_RTSP_ADDRESS_FLAG_IPV4; addrp = &priv->mcast_addr_v4; } if (*addrp == NULL) { if (priv->pool == NULL) goto no_pool; flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST; *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2); if (*addrp == NULL) goto no_address; /* FIXME: Also reserve the same port with unicast ANY address, since that's * where we are going to bind our socket. Probably loop until we find a port * available in both mcast and unicast pools. Maybe GstRTSPAddressPool * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */ } result = gst_rtsp_address_copy (*addrp); g_mutex_unlock (&stream->priv->lock); return result; /* ERRORS */ no_pool: { GST_ERROR_OBJECT (stream, "no address pool specified"); g_mutex_unlock (&stream->priv->lock); return NULL; } no_address: { GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); g_mutex_unlock (&stream->priv->lock); return NULL; } } /** * gst_rtsp_stream_reserve_address: * @stream: a #GstRTSPStream * @address: an address * @port: a port * @n_ports: n_ports * @ttl: a TTL * * Reserve @address and @port as the address and port of @stream. The original * #GstRTSPAddress is cached and copy is returned, so freeing the return value * won't release the address from the pool. * * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when * the address could be reserved. gst_rtsp_address_free() after usage. */ GstRTSPAddress * gst_rtsp_stream_reserve_address (GstRTSPStream * stream, const gchar * address, guint port, guint n_ports, guint ttl) { GstRTSPStreamPrivate *priv; GstRTSPAddress *result; GInetAddress *addr; GSocketFamily family; GstRTSPAddress **addrp; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); g_return_val_if_fail (address != NULL, NULL); g_return_val_if_fail (port > 0, NULL); g_return_val_if_fail (n_ports > 0, NULL); g_return_val_if_fail (ttl > 0, NULL); priv = stream->priv; addr = g_inet_address_new_from_string (address); if (!addr) { GST_ERROR ("failed to get inet addr from %s", address); family = G_SOCKET_FAMILY_IPV4; } else { family = g_inet_address_get_family (addr); g_object_unref (addr); } if (family == G_SOCKET_FAMILY_IPV6) addrp = &priv->mcast_addr_v6; else addrp = &priv->mcast_addr_v4; g_mutex_lock (&priv->lock); if (*addrp == NULL) { GstRTSPAddressPoolResult res; if (priv->pool == NULL) goto no_pool; res = gst_rtsp_address_pool_reserve_address (priv->pool, address, port, n_ports, ttl, addrp); if (res != GST_RTSP_ADDRESS_POOL_OK) goto no_address; /* FIXME: Also reserve the same port with unicast ANY address, since that's * where we are going to bind our socket. */ } else { if (g_ascii_strcasecmp ((*addrp)->address, address) || (*addrp)->port != port || (*addrp)->n_ports != n_ports || (*addrp)->ttl != ttl) goto different_address; } result = gst_rtsp_address_copy (*addrp); g_mutex_unlock (&priv->lock); return result; /* ERRORS */ no_pool: { GST_ERROR_OBJECT (stream, "no address pool specified"); g_mutex_unlock (&priv->lock); return NULL; } no_address: { GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool", address); g_mutex_unlock (&priv->lock); return NULL; } different_address: { GST_ERROR_OBJECT (stream, "address %s is not the same as %s that was already reserved", address, (*addrp)->address); g_mutex_unlock (&priv->lock); return NULL; } } /* must be called with lock */ static void set_socket_for_udpsink (GstElement * udpsink, GSocket * socket, GSocketFamily family) { const gchar *multisink_socket; if (family == G_SOCKET_FAMILY_IPV6) multisink_socket = "socket-v6"; else multisink_socket = "socket"; g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL); } /* must be called with lock */ static void set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket, GSocketFamily family, const gchar * multicast_iface, const gchar * addr_str, gint port) { set_socket_for_udpsink (udpsink, socket, family); if (multicast_iface) { g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL); } g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL); } /* must be called with lock */ static void set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket, GSocketFamily family) { set_socket_for_udpsink (udpsink, socket, family); } static guint16 get_port_from_socket (GSocket * socket) { guint16 port; GSocketAddress *sockaddr; GError *err; GST_DEBUG ("socket: %p", socket); sockaddr = g_socket_get_local_address (socket, &err); if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) { g_clear_object (&sockaddr); GST_ERROR ("failed to get sockaddr: %s", err->message); g_error_free (err); return 0; } port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr)); g_object_unref (sockaddr); return port; } static gboolean create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink, GSocket * socket_v4, GSocket * socket_v6, gboolean multicast, gboolean is_rtp) { GstRTSPStreamPrivate *priv = stream->priv; *udpsink = gst_element_factory_make ("multiudpsink", NULL); if (!*udpsink) goto no_udp_protocol; /* configure sinks */ g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL); g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL); if (is_rtp) g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL); else g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL); /* Needs to be async for RECORD streams, otherwise we will never go to * PLAYING because the sinks will wait for data while the udpsrc can't * provide data with timestamps in PAUSED. */ if (!is_rtp || priv->sinkpad) g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL); if (multicast) { /* join multicast group when adding clients, so we'll start receiving from it. * We cannot rely on the udpsrc to join the group since its socket is always a * local unicast one. */ g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL); g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL); } /* update the dscp qos field in the sinks */ update_dscp_qos (stream, udpsink); if (priv->server_addr_v4) { GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks"); set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4); } if (priv->server_addr_v6) { GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks"); set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6); } if (multicast) { gint port; if (priv->mcast_addr_v4) { GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks"); port = get_port_from_socket (socket_v4); if (!port) goto get_port_failed; set_multicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4, priv->multicast_iface, priv->mcast_addr_v4->address, port); } if (priv->mcast_addr_v6) { GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks"); port = get_port_from_socket (socket_v6); if (!port) goto get_port_failed; set_multicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6, priv->multicast_iface, priv->mcast_addr_v6->address, port); } } return TRUE; /* ERRORS */ no_udp_protocol: { GST_ERROR_OBJECT (stream, "failed to create udpsink element"); return FALSE; } get_port_failed: { GST_ERROR_OBJECT (stream, "failed to get udp port"); return FALSE; } } /* must be called with lock */ static gboolean create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket) { GstStateChangeReturn ret; g_assert (socket != NULL); *udpsrc = gst_element_factory_make ("udpsrc", NULL); if (*udpsrc == NULL) goto error; g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL); /* The udpsrc cannot do the join because its socket is always a local unicast * one. The udpsink sharing the same socket will do it for us. */ g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL); g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL); g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL); ret = gst_element_set_state (*udpsrc, GST_STATE_READY); if (ret == GST_STATE_CHANGE_FAILURE) goto error; return TRUE; /* ERRORS */ error: { if (*udpsrc) { gst_element_set_state (*udpsrc, GST_STATE_NULL); g_clear_object (udpsrc); } return FALSE; } } static gboolean alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family, GSocket * socket_out[2], GstRTSPAddress ** server_addr_out, gboolean multicast, GstRTSPTransport * ct) { GstRTSPStreamPrivate *priv = stream->priv; GSocket *rtp_socket = NULL; GSocket *rtcp_socket; gint tmp_rtp, tmp_rtcp; guint count; GList *rejected_addresses = NULL; GstRTSPAddress *addr = NULL; GInetAddress *inetaddr = NULL; GSocketAddress *rtp_sockaddr = NULL; GSocketAddress *rtcp_sockaddr = NULL; GstRTSPAddressPool *pool; pool = priv->pool; count = 0; /* Start with random port */ tmp_rtp = 0; rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, NULL); if (!rtcp_socket) goto no_udp_protocol; g_socket_set_multicast_loopback (rtcp_socket, FALSE); /* try to allocate 2 UDP ports, the RTP port should be an even * number and the RTCP port should be the next (uneven) port */ again: if (rtp_socket == NULL) { rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, NULL); if (!rtp_socket) goto no_udp_protocol; g_socket_set_multicast_loopback (rtp_socket, FALSE); } if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) { GstRTSPAddressFlags flags; if (addr) rejected_addresses = g_list_prepend (rejected_addresses, addr); if (!pool) goto no_pool; flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT; if (multicast) flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST; else flags |= GST_RTSP_ADDRESS_FLAG_UNICAST; if (family == G_SOCKET_FAMILY_IPV6) flags |= GST_RTSP_ADDRESS_FLAG_IPV6; else flags |= GST_RTSP_ADDRESS_FLAG_IPV4; addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2); if (addr == NULL) goto no_address; tmp_rtp = addr->port; g_clear_object (&inetaddr); /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and * socket control message set in udpsrc? */ if (multicast) inetaddr = g_inet_address_new_any (family); else inetaddr = g_inet_address_new_from_string (addr->address); } else { if (tmp_rtp != 0) { tmp_rtp += 2; if (++count > 20) goto no_ports; } if (inetaddr == NULL) inetaddr = g_inet_address_new_any (family); } rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp); if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) { GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again"); g_object_unref (rtp_sockaddr); goto again; } g_object_unref (rtp_sockaddr); rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL); if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) { g_clear_object (&rtp_sockaddr); goto socket_error; } tmp_rtp = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr)); g_object_unref (rtp_sockaddr); /* check if port is even */ if ((tmp_rtp & 1) != 0) { /* port not even, close and allocate another */ tmp_rtp++; g_clear_object (&rtp_socket); goto again; } /* set port */ tmp_rtcp = tmp_rtp + 1; rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp); if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) { GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again"); g_object_unref (rtcp_sockaddr); g_clear_object (&rtp_socket); goto again; } g_object_unref (rtcp_sockaddr); if (!addr) { addr = g_slice_new0 (GstRTSPAddress); addr->address = g_inet_address_to_string (inetaddr); addr->port = tmp_rtp; addr->n_ports = 2; } g_clear_object (&inetaddr); socket_out[0] = rtp_socket; socket_out[1] = rtcp_socket; *server_addr_out = addr; GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d", addr->address, tmp_rtp, tmp_rtcp); g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); return TRUE; /* ERRORS */ no_udp_protocol: { GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error"); goto cleanup; } no_pool: { GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no address pool specified"); goto cleanup; } no_address: { GST_ERROR_OBJECT (stream, "failed to acquire address from pool"); goto cleanup; } no_ports: { GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports"); goto cleanup; } socket_error: { GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error"); goto cleanup; } cleanup: { if (inetaddr) g_object_unref (inetaddr); g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free); if (addr) gst_rtsp_address_free (addr); if (rtp_socket) g_object_unref (rtp_socket); if (rtcp_socket) g_object_unref (rtcp_socket); return FALSE; } } /** * gst_rtsp_stream_allocate_udp_sockets: * @stream: a #GstRTSPStream * @family: protocol family * @transport: transport method * @use_client_setttings: Whether to use client settings or not * * Allocates RTP and RTCP ports. * * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated. */ gboolean gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream, GSocketFamily family, GstRTSPTransport * ct, gboolean use_transport_settings) { GstRTSPStreamPrivate *priv; gboolean ret = FALSE; GstRTSPLowerTrans transport; gboolean allocated = FALSE; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (ct != NULL, FALSE); priv = stream->priv; transport = ct->lower_transport; g_mutex_lock (&priv->lock); if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_addr_v4) allocated = TRUE; else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_addr_v6) allocated = TRUE; } else if (transport == GST_RTSP_LOWER_TRANS_UDP) { if (family == G_SOCKET_FAMILY_IPV4 && priv->server_addr_v4) allocated = TRUE; else if (family == G_SOCKET_FAMILY_IPV6 && priv->server_addr_v6) allocated = TRUE; } if (allocated) { g_mutex_unlock (&priv->lock); return TRUE; } if (family == G_SOCKET_FAMILY_IPV4) { /* IPv4 */ if (transport == GST_RTSP_LOWER_TRANS_UDP) { /* UDP unicast */ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4"); ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->socket_v4, &priv->server_addr_v4, FALSE, ct); } else { /* multicast */ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4"); ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct); } } else { /* IPv6 */ if (transport == GST_RTSP_LOWER_TRANS_UDP) { /* unicast */ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6"); ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->socket_v6, &priv->server_addr_v6, FALSE, ct); } else { /* multicast */ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6"); ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct); } } g_mutex_unlock (&priv->lock); return ret; } /** * gst_rtsp_stream_set_client_side: * @stream: a #GstRTSPStream * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of * an RTSP connection. * * Sets the #GstRTSPStream as a 'client side' stream - used for sending * streams to an RTSP server via RECORD. This has the practical effect * of changing which UDP port numbers are used when setting up the local * side of the stream sending to be either the 'server' or 'client' pair * of a configured UDP transport. */ void gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_mutex_lock (&priv->lock); priv->client_side = client_side; g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_is_client_side: * @stream: a #GstRTSPStream * * See gst_rtsp_stream_set_client_side() * * Returns: TRUE if this #GstRTSPStream is client-side. */ gboolean gst_rtsp_stream_is_client_side (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; gboolean ret; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); ret = priv->client_side; g_mutex_unlock (&priv->lock); return ret; } /** * gst_rtsp_stream_get_server_port: * @stream: a #GstRTSPStream * @server_port: (out): result server port * @family: the port family to get * * Fill @server_port with the port pair used by the server. This function can * only be called when @stream has been joined. */ void gst_rtsp_stream_get_server_port (GstRTSPStream * stream, GstRTSPRange * server_port, GSocketFamily family) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_return_if_fail (priv->joined_bin != NULL); if (server_port) { server_port->min = 0; server_port->max = 0; } g_mutex_lock (&priv->lock); if (family == G_SOCKET_FAMILY_IPV4) { if (server_port && priv->server_addr_v4) { server_port->min = priv->server_addr_v4->port; server_port->max = priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1; } } else { if (server_port && priv->server_addr_v6) { server_port->min = priv->server_addr_v6->port; server_port->max = priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1; } } g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_get_rtpsession: * @stream: a #GstRTSPStream * * Get the RTP session of this stream. * * Returns: (transfer full): The RTP session of this stream. Unref after usage. */ GObject * gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GObject *session; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&priv->lock); if ((session = priv->session)) g_object_ref (session); g_mutex_unlock (&priv->lock); return session; } /** * gst_rtsp_stream_get_srtp_encoder: * @stream: a #GstRTSPStream * * Get the SRTP encoder for this stream. * * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage. */ GstElement * gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstElement *encoder; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&priv->lock); if ((encoder = priv->srtpenc)) g_object_ref (encoder); g_mutex_unlock (&priv->lock); return encoder; } /** * gst_rtsp_stream_get_ssrc: * @stream: a #GstRTSPStream * @ssrc: (out): result ssrc * * Get the SSRC used by the RTP session of this stream. This function can only * be called when @stream has been joined. */ void gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_return_if_fail (priv->joined_bin != NULL); g_mutex_lock (&priv->lock); if (ssrc && priv->session) g_object_get (priv->session, "internal-ssrc", ssrc, NULL); g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_set_retransmission_time: * @stream: a #GstRTSPStream * @time: a #GstClockTime * * Set the amount of time to store retransmission packets. */ void gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream, GstClockTime time) { GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time); g_mutex_lock (&stream->priv->lock); stream->priv->rtx_time = time; if (stream->priv->rtxsend) g_object_set (stream->priv->rtxsend, "max-size-time", GST_TIME_AS_MSECONDS (time), NULL); g_mutex_unlock (&stream->priv->lock); } /** * gst_rtsp_stream_get_retransmission_time: * @stream: a #GstRTSPStream * * Get the amount of time to store retransmission data. * * Returns: the amount of time to store retransmission data. */ GstClockTime gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream) { GstClockTime ret; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); g_mutex_lock (&stream->priv->lock); ret = stream->priv->rtx_time; g_mutex_unlock (&stream->priv->lock); return ret; } /** * gst_rtsp_stream_set_retransmission_pt: * @stream: a #GstRTSPStream * @rtx_pt: a #guint * * Set the payload type (pt) for retransmission of this stream. */ void gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt) { g_return_if_fail (GST_IS_RTSP_STREAM (stream)); GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt); g_mutex_lock (&stream->priv->lock); stream->priv->rtx_pt = rtx_pt; if (stream->priv->rtxsend) { guint pt = gst_rtsp_stream_get_pt (stream); gchar *pt_s = g_strdup_printf ("%d", pt); GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map", pt_s, G_TYPE_UINT, rtx_pt, NULL); g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL); g_free (pt_s); gst_structure_free (rtx_pt_map); } g_mutex_unlock (&stream->priv->lock); } /** * gst_rtsp_stream_get_retransmission_pt: * @stream: a #GstRTSPStream * * Get the payload-type used for retransmission of this stream * * Returns: The retransmission PT. */ guint gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream) { guint rtx_pt; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); g_mutex_lock (&stream->priv->lock); rtx_pt = stream->priv->rtx_pt; g_mutex_unlock (&stream->priv->lock); return rtx_pt; } /** * gst_rtsp_stream_set_buffer_size: * @stream: a #GstRTSPStream * @size: the buffer size * * Set the size of the UDP transmission buffer (in bytes) * Needs to be set before the stream is joined to a bin. * * Since: 1.6 */ void gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size) { g_mutex_lock (&stream->priv->lock); stream->priv->buffer_size = size; g_mutex_unlock (&stream->priv->lock); } /** * gst_rtsp_stream_get_buffer_size: * @stream: a #GstRTSPStream * * Get the size of the UDP transmission buffer (in bytes) * * Returns: the size of the UDP TX buffer * * Since: 1.6 */ guint gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream) { guint buffer_size; g_mutex_lock (&stream->priv->lock); buffer_size = stream->priv->buffer_size; g_mutex_unlock (&stream->priv->lock); return buffer_size; } /* executed from streaming thread */ static void caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; GstCaps *newcaps, *oldcaps; newcaps = gst_pad_get_current_caps (pad); GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps, newcaps); g_mutex_lock (&priv->lock); oldcaps = priv->caps; priv->caps = newcaps; g_mutex_unlock (&priv->lock); if (oldcaps) gst_caps_unref (oldcaps); } static void dump_structure (const GstStructure * s) { gchar *sstr; sstr = gst_structure_to_string (s); GST_INFO ("structure: %s", sstr); g_free (sstr); } static GstRTSPStreamTransport * find_transport (GstRTSPStream * stream, const gchar * rtcp_from) { GstRTSPStreamPrivate *priv = stream->priv; GList *walk; GstRTSPStreamTransport *result = NULL; const gchar *tmp; gchar *dest; guint port; if (rtcp_from == NULL) return NULL; tmp = g_strrstr (rtcp_from, ":"); if (tmp == NULL) return NULL; port = atoi (tmp + 1); dest = g_strndup (rtcp_from, tmp - rtcp_from); g_mutex_lock (&priv->lock); GST_INFO ("finding %s:%d in %d transports", dest, port, g_list_length (priv->transports)); for (walk = priv->transports; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *trans = walk->data; const GstRTSPTransport *tr; gint min, max; tr = gst_rtsp_stream_transport_get_transport (trans); if (priv->client_side) { /* In client side mode the 'destination' is the RTSP server, so send * to those ports */ min = tr->server_port.min; max = tr->server_port.max; } else { min = tr->client_port.min; max = tr->client_port.max; } if ((g_ascii_strcasecmp (tr->destination, dest) == 0) && (min == port || max == port)) { result = trans; break; } } if (result) g_object_ref (result); g_mutex_unlock (&priv->lock); g_free (dest); return result; } static GstRTSPStreamTransport * check_transport (GObject * source, GstRTSPStream * stream) { GstStructure *stats; GstRTSPStreamTransport *trans; /* see if we have a stream to match with the origin of the RTCP packet */ trans = g_object_get_qdata (source, ssrc_stream_map_key); if (trans == NULL) { g_object_get (source, "stats", &stats, NULL); if (stats) { const gchar *rtcp_from; dump_structure (stats); rtcp_from = gst_structure_get_string (stats, "rtcp-from"); if ((trans = find_transport (stream, rtcp_from))) { GST_INFO ("%p: found transport %p for source %p", stream, trans, source); g_object_set_qdata_full (source, ssrc_stream_map_key, trans, g_object_unref); } gst_structure_free (stats); } } return trans; } static void on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream) { GstRTSPStreamTransport *trans; GST_INFO ("%p: new source %p", stream, source); trans = check_transport (source, stream); if (trans) GST_INFO ("%p: source %p for transport %p", stream, source, trans); } static void on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream) { GST_INFO ("%p: new SDES %p", stream, source); } static void on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream) { GstRTSPStreamTransport *trans; trans = check_transport (source, stream); if (trans) { GST_INFO ("%p: source %p in transport %p is active", stream, source, trans); gst_rtsp_stream_transport_keep_alive (trans); } #ifdef DUMP_STATS { GstStructure *stats; g_object_get (source, "stats", &stats, NULL); if (stats) { dump_structure (stats); gst_structure_free (stats); } } #endif } static void on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream) { GST_INFO ("%p: source %p bye", stream, source); } static void on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream) { GstRTSPStreamTransport *trans; GST_INFO ("%p: source %p bye timeout", stream, source); if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) { gst_rtsp_stream_transport_set_timed_out (trans, TRUE); g_object_set_qdata (source, ssrc_stream_map_key, NULL); } } static void on_timeout (GObject * session, GObject * source, GstRTSPStream * stream) { GstRTSPStreamTransport *trans; GST_INFO ("%p: source %p timeout", stream, source); if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) { gst_rtsp_stream_transport_set_timed_out (trans, TRUE); g_object_set_qdata (source, ssrc_stream_map_key, NULL); } } static void on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream) { GST_INFO ("%p: new sender source %p", stream, source); #ifndef DUMP_STATS { GstStructure *stats; g_object_get (source, "stats", &stats, NULL); if (stats) { dump_structure (stats); gst_structure_free (stats); } } #endif } static void on_sender_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream) { #ifndef DUMP_STATS { GstStructure *stats; g_object_get (source, "stats", &stats, NULL); if (stats) { dump_structure (stats); gst_structure_free (stats); } } #endif } static void clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp) { if (is_rtp) { g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL); g_list_free (priv->tr_cache_rtp); priv->tr_cache_rtp = NULL; } else { g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL); g_list_free (priv->tr_cache_rtcp); priv->tr_cache_rtcp = NULL; } } static GstFlowReturn handle_new_sample (GstAppSink * sink, gpointer user_data) { GstRTSPStreamPrivate *priv; GList *walk; GstSample *sample; GstBuffer *buffer; GstRTSPStream *stream; gboolean is_rtp; sample = gst_app_sink_pull_sample (sink); if (!sample) return GST_FLOW_OK; stream = (GstRTSPStream *) user_data; priv = stream->priv; buffer = gst_sample_get_buffer (sample); is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0]; g_mutex_lock (&priv->lock); if (is_rtp) { if (priv->tr_cache_cookie_rtp != priv->transports_cookie) { clear_tr_cache (priv, is_rtp); for (walk = priv->transports; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; priv->tr_cache_rtp = g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr)); } priv->tr_cache_cookie_rtp = priv->transports_cookie; } } else { if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) { clear_tr_cache (priv, is_rtp); for (walk = priv->transports; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; priv->tr_cache_rtcp = g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr)); } priv->tr_cache_cookie_rtcp = priv->transports_cookie; } } g_mutex_unlock (&priv->lock); if (is_rtp) { for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; gst_rtsp_stream_transport_send_rtp (tr, buffer); } } else { for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) { GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; gst_rtsp_stream_transport_send_rtcp (tr, buffer); } } gst_sample_unref (sample); return GST_FLOW_OK; } static GstAppSinkCallbacks sink_cb = { NULL, /* not interested in EOS */ NULL, /* not interested in preroll samples */ handle_new_sample, }; static GstElement * get_rtp_encoder (GstRTSPStream * stream, guint session) { GstRTSPStreamPrivate *priv = stream->priv; if (priv->srtpenc == NULL) { gchar *name; name = g_strdup_printf ("srtpenc_%u", session); priv->srtpenc = gst_element_factory_make ("srtpenc", name); g_free (name); g_object_set (priv->srtpenc, "random-key", TRUE, NULL); } return gst_object_ref (priv->srtpenc); } static GstElement * request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; GstElement *oldenc, *enc; GstPad *pad; gchar *name; if (priv->idx != session) return NULL; GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session); oldenc = priv->srtpenc; enc = get_rtp_encoder (stream, session); name = g_strdup_printf ("rtp_sink_%d", session); pad = gst_element_get_request_pad (enc, name); g_free (name); gst_object_unref (pad); if (oldenc == NULL) g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0, enc); return enc; } static GstElement * request_rtcp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; GstElement *oldenc, *enc; GstPad *pad; gchar *name; if (priv->idx != session) return NULL; GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session); oldenc = priv->srtpenc; enc = get_rtp_encoder (stream, session); name = g_strdup_printf ("rtcp_sink_%d", session); pad = gst_element_get_request_pad (enc, name); g_free (name); gst_object_unref (pad); if (oldenc == NULL) g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0, enc); return enc; } static GstCaps * request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; GstCaps *caps; GST_DEBUG ("request key %08x", ssrc); g_mutex_lock (&priv->lock); if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc)))) gst_caps_ref (caps); g_mutex_unlock (&priv->lock); return caps; } static GstElement * request_rtp_rtcp_decoder (GstElement * rtpbin, guint session, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; if (priv->idx != session) return NULL; if (priv->srtpdec == NULL) { gchar *name; name = g_strdup_printf ("srtpdec_%u", session); priv->srtpdec = gst_element_factory_make ("srtpdec", name); g_free (name); g_signal_connect (priv->srtpdec, "request-key", (GCallback) request_key, stream); } return gst_object_ref (priv->srtpdec); } /** * gst_rtsp_stream_request_aux_sender: * @stream: a #GstRTSPStream * @sessid: the session id * * Creating a rtxsend bin * * Returns: (transfer full): a #GstElement. * * Since: 1.6 */ GstElement * gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid) { GstElement *bin; GstPad *pad; GstStructure *pt_map; gchar *name; guint pt, rtx_pt; gchar *pt_s; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); pt = gst_rtsp_stream_get_pt (stream); pt_s = g_strdup_printf ("%u", pt); rtx_pt = stream->priv->rtx_pt; GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt); bin = gst_bin_new (NULL); stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL); pt_map = gst_structure_new ("application/x-rtp-pt-map", pt_s, G_TYPE_UINT, rtx_pt, NULL); g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map, "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL); g_free (pt_s); gst_structure_free (pt_map); gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend)); pad = gst_element_get_static_pad (stream->priv->rtxsend, "src"); name = g_strdup_printf ("src_%u", sessid); gst_element_add_pad (bin, gst_ghost_pad_new (name, pad)); g_free (name); gst_object_unref (pad); pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink"); name = g_strdup_printf ("sink_%u", sessid); gst_element_add_pad (bin, gst_ghost_pad_new (name, pad)); g_free (name); gst_object_unref (pad); return bin; } /** * gst_rtsp_stream_set_pt_map: * @stream: a #GstRTSPStream * @pt: the pt * @caps: a #GstCaps * * Configure a pt map between @pt and @caps. */ void gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps) { GstRTSPStreamPrivate *priv = stream->priv; g_mutex_lock (&priv->lock); g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps)); g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_set_publish_clock_mode: * @stream: a #GstRTSPStream * @mode: the clock publish mode * * Sets if and how the stream clock should be published according to RFC7273. * * Since: 1.8 */ void gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream, GstRTSPPublishClockMode mode) { GstRTSPStreamPrivate *priv; priv = stream->priv; g_mutex_lock (&priv->lock); priv->publish_clock_mode = mode; g_mutex_unlock (&priv->lock); } /** * gst_rtsp_stream_get_publish_clock_mode: * @stream: a #GstRTSPStream * * Gets if and how the stream clock should be published according to RFC7273. * * Returns: The GstRTSPPublishClockMode * * Since: 1.8 */ GstRTSPPublishClockMode gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstRTSPPublishClockMode ret; priv = stream->priv; g_mutex_lock (&priv->lock); ret = priv->publish_clock_mode; g_mutex_unlock (&priv->lock); return ret; } static GstCaps * request_pt_map (GstElement * rtpbin, guint session, guint pt, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; GstCaps *caps = NULL; g_mutex_lock (&priv->lock); if (priv->idx == session) { caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt)); if (caps) { GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps); gst_caps_ref (caps); } else { GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt); } } g_mutex_unlock (&priv->lock); return caps; } static void pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream) { GstRTSPStreamPrivate *priv = stream->priv; gchar *name; GstPadLinkReturn ret; guint sessid; GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream, GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad)); name = gst_pad_get_name (pad); if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) { g_free (name); return; } g_free (name); if (priv->idx != sessid) return; if (gst_pad_is_linked (priv->sinkpad)) { GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream, GST_DEBUG_PAD_NAME (priv->sinkpad)); return; } /* link the RTP pad to the session manager, it should not really fail unless * this is not really an RTP pad */ ret = gst_pad_link (pad, priv->sinkpad); if (ret != GST_PAD_LINK_OK) goto link_failed; priv->recv_rtp_src = gst_object_ref (pad); return; /* ERRORS */ link_failed: { GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream, GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad)); } } static void on_npt_stop (GstElement * rtpbin, guint session, guint ssrc, GstRTSPStream * stream) { /* TODO: What to do here other than this? */ GST_DEBUG ("Stream %p: Got EOS", stream); gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ()); } typedef struct _ProbeData ProbeData; struct _ProbeData { GstRTSPStream *stream; /* existing sink, already linked to tee */ GstElement *sink1; /* new sink, about to be linked */ GstElement *sink2; /* new queue element, that will be linked to tee and sink1 */ GstElement **queue1; /* new queue element, that will be linked to tee and sink2 */ GstElement **queue2; GstPad *sink_pad; GstPad *tee_pad; guint index; }; static void free_cb_data (gpointer user_data) { ProbeData *data = user_data; gst_object_unref (data->stream); gst_object_unref (data->sink1); gst_object_unref (data->sink2); gst_object_unref (data->sink_pad); gst_object_unref (data->tee_pad); g_free (data); } static void create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream, GstElement * tee, GstElement * sink, GstElement ** queue) { GstRTSPStreamPrivate *priv = stream->priv; GstPad *tee_pad; GstPad *queue_pad; GstPad *sink_pad; /* create queue for the new stream */ *queue = gst_element_factory_make ("queue", NULL); g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); gst_bin_add (priv->joined_bin, *queue); /* link tee to queue */ tee_pad = gst_element_get_request_pad (tee, "src_%u"); queue_pad = gst_element_get_static_pad (*queue, "sink"); gst_pad_link (tee_pad, queue_pad); gst_object_unref (queue_pad); gst_object_unref (tee_pad); /* link queue to sink */ queue_pad = gst_element_get_static_pad (*queue, "src"); sink_pad = gst_element_get_static_pad (sink, "sink"); gst_pad_link (queue_pad, sink_pad); gst_object_unref (queue_pad); gst_object_unref (sink_pad); gst_element_sync_state_with_parent (sink); gst_element_sync_state_with_parent (*queue); } static GstPadProbeReturn create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad, GstPadProbeInfo * info, gpointer user_data) { GstRTSPStreamPrivate *priv; ProbeData *data = user_data; GstRTSPStream *stream; GstElement **queue1; GstElement **queue2; GstPad *sink_pad; GstPad *tee_pad; GstPad *queue_pad; guint index; stream = data->stream; priv = stream->priv; queue1 = data->queue1; queue2 = data->queue2; sink_pad = data->sink_pad; tee_pad = data->tee_pad; index = data->index; /* unlink tee and the existing sink: * .-----. .---------. * | tee | | sink1 | * sink src->sink | * '-----' '---------' */ g_assert (gst_pad_unlink (tee_pad, sink_pad)); /* add queue to the already existing stream */ *queue1 = gst_element_factory_make ("queue", NULL); g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL); gst_bin_add (priv->joined_bin, *queue1); /* link tee, queue and sink: * .-----. .---------. .---------. * | tee | | queue1 | | sink1 | * sink src->sink src->sink | * '-----' '---------' '---------' */ queue_pad = gst_element_get_static_pad (*queue1, "sink"); gst_pad_link (tee_pad, queue_pad); gst_object_unref (queue_pad); queue_pad = gst_element_get_static_pad (*queue1, "src"); gst_pad_link (queue_pad, sink_pad); gst_object_unref (queue_pad); gst_element_sync_state_with_parent (*queue1); /* create queue and link it to tee and the new sink */ create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index], data->sink2, queue2); /* the final stream: * * .-----. .---------. .---------. * | tee | | queue1 | | sink1 | * sink src->sink src->sink | * | | '---------' '---------' * | | .---------. .---------. * | | | queue2 | | sink2 | * | src->sink src->sink | * '-----' '---------' '---------' */ return GST_PAD_PROBE_REMOVE; } static void create_and_plug_queue_to_linked_stream (GstRTSPStream * stream, GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1, GstElement ** queue2) { ProbeData *data; data = g_new0 (ProbeData, 1); data->stream = gst_object_ref (stream); data->sink1 = gst_object_ref (sink1); data->sink2 = gst_object_ref (sink2); data->queue1 = queue1; data->queue2 = queue2; data->index = index; data->sink_pad = gst_element_get_static_pad (sink1, "sink"); g_assert (data->sink_pad); data->tee_pad = gst_pad_get_peer (data->sink_pad); g_assert (data->tee_pad); gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE, create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data); } static void plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug, GstElement ** queue_to_plug, guint index, gboolean is_mcast) { GstRTSPStreamPrivate *priv = stream->priv; GstElement *existing_sink; if (is_mcast) existing_sink = priv->udpsink[index]; else existing_sink = priv->mcast_udpsink[index]; GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp"); /* add sink to the bin */ gst_bin_add (priv->joined_bin, sink_to_plug); if (priv->appsink[index] && existing_sink) { /* queues are already added for the existing stream, add one for the newly added udp stream */ create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index], sink_to_plug, queue_to_plug); } else if (priv->appsink[index] || existing_sink) { GstElement **queue; GstElement *element; /* add queue to the already existing stream plus the newly created udp stream */ if (priv->appsink[index]) { element = priv->appsink[index]; queue = &priv->appqueue[index]; } else { element = existing_sink; if (is_mcast) queue = &priv->udpqueue[index]; else queue = &priv->mcast_udpqueue[index]; } create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug, index, queue, queue_to_plug); } else { GstPad *tee_pad; GstPad *sink_pad; GST_DEBUG_OBJECT (stream, "creating first stream"); /* no need to add queues */ tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u"); sink_pad = gst_element_get_static_pad (sink_to_plug, "sink"); gst_pad_link (tee_pad, sink_pad); gst_object_unref (tee_pad); gst_object_unref (sink_pad); } gst_element_sync_state_with_parent (sink_to_plug); } static void plug_tcp_sink (GstRTSPStream * stream, guint index) { GstRTSPStreamPrivate *priv = stream->priv; GST_DEBUG_OBJECT (stream, "plug tcp sink"); /* add sink to the bin */ gst_bin_add (priv->joined_bin, priv->appsink[index]); if (priv->mcast_udpsink[index] && priv->udpsink[index]) { /* queues are already added for the existing stream, add one for the newly added tcp stream */ create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index], priv->appsink[index], &priv->appqueue[index]); } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) { GstElement **queue; GstElement *element; /* add queue to the already existing stream plus the newly created tcp stream */ if (priv->mcast_udpsink[index]) { element = priv->mcast_udpsink[index]; queue = &priv->mcast_udpqueue[index]; } else { element = priv->udpsink[index]; queue = &priv->udpqueue[index]; } create_and_plug_queue_to_linked_stream (stream, element, priv->appsink[index], index, queue, &priv->appqueue[index]); } else { GstPad *tee_pad; GstPad *sink_pad; /* no need to add queues */ tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u"); sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink"); gst_pad_link (tee_pad, sink_pad); gst_object_unref (tee_pad); gst_object_unref (sink_pad); } gst_element_sync_state_with_parent (priv->appsink[index]); } static void plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport, guint index) { GstRTSPStreamPrivate *priv; gboolean is_tcp, is_udp, is_mcast; priv = stream->priv; is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; if (is_udp) plug_udp_sink (stream, priv->udpsink[index], &priv->udpqueue[index], index, FALSE); else if (is_mcast) plug_udp_sink (stream, priv->mcast_udpsink[index], &priv->mcast_udpqueue[index], index, TRUE); else if (is_tcp) plug_tcp_sink (stream, index); } /* must be called with lock */ static gboolean create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport) { GstRTSPStreamPrivate *priv; GstPad *pad; GstBin *bin; gboolean is_tcp, is_udp, is_mcast; gint i; GST_DEBUG_OBJECT (stream, "create sender part"); priv = stream->priv; bin = priv->joined_bin; is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d", is_tcp, is_udp, is_mcast); if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) { GST_WARNING_OBJECT (stream, "no sockets assigned for UDP"); return FALSE; } if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) { GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast"); return FALSE; } for (i = 0; i < 2; i++) { gboolean link_tee = FALSE; /* For the sender we create this bit of pipeline for both * RTP and RTCP. * Initially there will be only one active transport for * the stream, so the pipeline will look like this: * * .--------. .-----. .---------. * | rtpbin | | tee | | sink | * | send->sink src->sink | * '--------' '-----' '---------' * * For each new transport, the already existing branch will * be reconfigured by adding a queue element: * * .--------. .-----. .---------. .---------. * | rtpbin | | tee | | queue | | udpsink | * | send->sink src->sink src->sink | * '--------' | | '---------' '---------' * | | .---------. .---------. * | | | queue | | udpsink | * | src->sink src->sink | * | | '---------' '---------' * | | .---------. .---------. * | | | queue | | appsink | * | src->sink src->sink | * '-----' '---------' '---------' */ /* Only link the RTP send src if we're going to send RTP, link * the RTCP send src always */ if (!priv->srcpad && i == 0) continue; if (!priv->tee[i]) { /* make tee for RTP/RTCP */ priv->tee[i] = gst_element_factory_make ("tee", NULL); gst_bin_add (bin, priv->tee[i]); link_tee = TRUE; } if (is_udp && !priv->udpsink[i]) { /* we create only one pair of udpsinks for IPv4 and IPv6 */ create_and_configure_udpsink (stream, &priv->udpsink[i], priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0)); plug_sink (stream, transport, i); } else if (is_mcast && !priv->mcast_udpsink[i]) { /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */ create_and_configure_udpsink (stream, &priv->mcast_udpsink[i], priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0)); plug_sink (stream, transport, i); } else if (is_tcp && !priv->appsink[i]) { /* make appsink */ priv->appsink[i] = gst_element_factory_make ("appsink", NULL); g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL); /* we need to set sync and preroll to FALSE for the sink to avoid * deadlock. This is only needed for sink sending RTCP data. */ if (i == 1) g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL); gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]), &sink_cb, stream, NULL); plug_sink (stream, transport, i); } if (link_tee) { /* and link to rtpbin send pad */ gst_element_sync_state_with_parent (priv->tee[i]); pad = gst_element_get_static_pad (priv->tee[i], "sink"); gst_pad_link (priv->send_src[i], pad); gst_object_unref (pad); } } return TRUE; } /* must be called with lock */ static void plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src, GstElement * funnel) { GstRTSPStreamPrivate *priv; GstPad *pad, *selpad; priv = stream->priv; if (priv->srcpad) { /* we set and keep these to playing so that they don't cause NO_PREROLL return * values. This is only relevant for PLAY pipelines */ gst_element_set_state (src, GST_STATE_PLAYING); gst_element_set_locked_state (src, TRUE); } /* add src */ gst_bin_add (bin, src); /* and link to the funnel */ selpad = gst_element_get_request_pad (funnel, "sink_%u"); pad = gst_element_get_static_pad (src, "src"); gst_pad_link (pad, selpad); gst_object_unref (pad); gst_object_unref (selpad); } /* must be called with lock */ static gboolean create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport * transport) { GstRTSPStreamPrivate *priv; GstPad *pad; GstBin *bin; gboolean tcp; gboolean udp; gboolean mcast; gint i; GST_DEBUG_OBJECT (stream, "create receiver part"); priv = stream->priv; bin = priv->joined_bin; tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP; udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP; mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST; for (i = 0; i < 2; i++) { /* For the receiver we create this bit of pipeline for both * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc * and it is all funneled into the rtpbin receive pad. * * * .--------. .--------. .--------. * | udpsrc | | funnel | | rtpbin | * | RTP src->sink src->sink | * '--------' | | | | * .--------. | | | | * | appsrc | | | | | * | RTP src->sink | | | * '--------' '--------' | | * | | * .--------. .--------. | | * | udpsrc | | funnel | | | * | RTCP src->sink src->sink | * '--------' | | '--------' * .--------. | | * | appsrc | | | * | RTCP src->sink | * '--------' '--------' */ if (!priv->sinkpad && i == 0) { /* Only connect recv RTP sink if we expect to receive RTP. Connect recv * RTCP sink always */ continue; } /* make funnel for the RTP/RTCP receivers */ priv->funnel[i] = gst_element_factory_make ("funnel", NULL); gst_bin_add (bin, priv->funnel[i]); pad = gst_element_get_static_pad (priv->funnel[i], "src"); gst_pad_link (pad, priv->recv_sink[i]); gst_object_unref (pad); if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) { GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources"); if (!create_and_configure_udpsource (&priv->udpsrc_v4[i], priv->socket_v4[i])) goto udpsrc_error; plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]); } if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) { GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources"); if (!create_and_configure_udpsource (&priv->udpsrc_v6[i], priv->socket_v6[i])) goto udpsrc_error; plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]); } if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) { GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources"); if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i], priv->mcast_socket_v4[i])) goto mcast_udpsrc_error; plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]); } if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) { GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources"); if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i], priv->mcast_socket_v6[i])) goto mcast_udpsrc_error; plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]); } if (tcp && !priv->appsrc[i]) { /* make and add appsrc */ priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL); priv->appsrc_base_time[i] = -1; g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live", TRUE, NULL); plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]); } gst_element_sync_state_with_parent (priv->funnel[i]); } return TRUE; mcast_udpsrc_error: udpsrc_error: return FALSE; } static gboolean check_mcast_part_for_transport (GstRTSPStream * stream, const GstRTSPTransport * tr) { GstRTSPStreamPrivate *priv = stream->priv; GInetAddress *inetaddr; GSocketFamily family; GstRTSPAddress *mcast_addr; /* Check if it's a ipv4 or ipv6 transport */ inetaddr = g_inet_address_new_from_string (tr->destination); family = g_inet_address_get_family (inetaddr); g_object_unref (inetaddr); /* Select fields corresponding to the family */ if (family == G_SOCKET_FAMILY_IPV4) { mcast_addr = priv->mcast_addr_v4; } else { mcast_addr = priv->mcast_addr_v6; } /* We support only one mcast group per family, make sure this transport * matches it. */ if (!mcast_addr) goto no_addr; if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 || tr->port.min != mcast_addr->port || tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 || tr->ttl != mcast_addr->ttl) goto wrong_addr; return TRUE; no_addr: { GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address " "has been reserved"); return FALSE; } wrong_addr: { GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match " "the reserved address"); return FALSE; } } /** * gst_rtsp_stream_join_bin: * @stream: a #GstRTSPStream * @bin: (transfer none): a #GstBin to join * @rtpbin: (transfer none): a rtpbin element in @bin * @state: the target state of the new elements * * Join the #GstBin @bin that contains the element @rtpbin. * * @stream will link to @rtpbin, which must be inside @bin. The elements * added to @bin will be set to the state given in @state. * * Returns: %TRUE on success. */ gboolean gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin, GstElement * rtpbin, GstState state) { GstRTSPStreamPrivate *priv; guint idx; gchar *name; GstPadLinkReturn ret; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_BIN (bin), FALSE); g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); if (priv->joined_bin != NULL) goto was_joined; /* create a session with the same index as the stream */ idx = priv->idx; GST_INFO ("stream %p joining bin as session %u", stream, idx); if (priv->profiles & GST_RTSP_PROFILE_SAVP || priv->profiles & GST_RTSP_PROFILE_SAVPF) { /* For SRTP */ g_signal_connect (rtpbin, "request-rtp-encoder", (GCallback) request_rtp_encoder, stream); g_signal_connect (rtpbin, "request-rtcp-encoder", (GCallback) request_rtcp_encoder, stream); g_signal_connect (rtpbin, "request-rtp-decoder", (GCallback) request_rtp_rtcp_decoder, stream); g_signal_connect (rtpbin, "request-rtcp-decoder", (GCallback) request_rtp_rtcp_decoder, stream); } if (priv->sinkpad) { g_signal_connect (rtpbin, "request-pt-map", (GCallback) request_pt_map, stream); } /* get pads from the RTP session element for sending and receiving * RTP/RTCP*/ if (priv->srcpad) { /* get a pad for sending RTP */ name = g_strdup_printf ("send_rtp_sink_%u", idx); priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name); g_free (name); /* link the RTP pad to the session manager, it should not really fail unless * this is not really an RTP pad */ ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink); if (ret != GST_PAD_LINK_OK) goto link_failed; name = g_strdup_printf ("send_rtp_src_%u", idx); priv->send_src[0] = gst_element_get_static_pad (rtpbin, name); g_free (name); } else { /* RECORD case: need to connect our sinkpad from here */ g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream); /* EOS */ g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream); name = g_strdup_printf ("recv_rtp_sink_%u", idx); priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name); g_free (name); } name = g_strdup_printf ("send_rtcp_src_%u", idx); priv->send_src[1] = gst_element_get_request_pad (rtpbin, name); g_free (name); name = g_strdup_printf ("recv_rtcp_sink_%u", idx); priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name); g_free (name); /* get the session */ g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session); g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc, stream); g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes, stream); g_signal_connect (priv->session, "on-ssrc-active", (GCallback) on_ssrc_active, stream); g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, stream); g_signal_connect (priv->session, "on-bye-timeout", (GCallback) on_bye_timeout, stream); g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout, stream); /* signal for sender ssrc */ g_signal_connect (priv->session, "on-new-sender-ssrc", (GCallback) on_new_sender_ssrc, stream); g_signal_connect (priv->session, "on-sender-ssrc-active", (GCallback) on_sender_ssrc_active, stream); if (priv->srcpad) { /* be notified of caps changes */ priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps", (GCallback) caps_notify, stream); priv->caps = gst_pad_get_current_caps (priv->send_src[0]); } priv->joined_bin = bin; GST_DEBUG_OBJECT (stream, "successfully joined bin"); g_mutex_unlock (&priv->lock); return TRUE; /* ERRORS */ was_joined: { g_mutex_unlock (&priv->lock); return TRUE; } link_failed: { GST_WARNING ("failed to link stream %u", idx); gst_object_unref (priv->send_rtp_sink); priv->send_rtp_sink = NULL; g_mutex_unlock (&priv->lock); return FALSE; } } static void clear_element (GstBin * bin, GstElement ** elementptr) { if (*elementptr) { gst_element_set_locked_state (*elementptr, FALSE); gst_element_set_state (*elementptr, GST_STATE_NULL); if (GST_ELEMENT_PARENT (*elementptr)) gst_bin_remove (bin, *elementptr); else gst_object_unref (*elementptr); *elementptr = NULL; } } /** * gst_rtsp_stream_leave_bin: * @stream: a #GstRTSPStream * @bin: (transfer none): a #GstBin * @rtpbin: (transfer none): a rtpbin #GstElement * * Remove the elements of @stream from @bin. * * Return: %TRUE on success. */ gboolean gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, GstElement * rtpbin) { GstRTSPStreamPrivate *priv; gint i; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (GST_IS_BIN (bin), FALSE); g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); if (priv->joined_bin == NULL) goto was_not_joined; if (priv->joined_bin != bin) goto wrong_bin; priv->joined_bin = NULL; /* all transports must be removed by now */ if (priv->transports != NULL) goto transports_not_removed; clear_tr_cache (priv, TRUE); clear_tr_cache (priv, FALSE); GST_INFO ("stream %p leaving bin", stream); if (priv->srcpad) { gst_pad_unlink (priv->srcpad, priv->send_rtp_sink); g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig); gst_element_release_request_pad (rtpbin, priv->send_rtp_sink); gst_object_unref (priv->send_rtp_sink); priv->send_rtp_sink = NULL; } else if (priv->recv_rtp_src) { gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad); gst_object_unref (priv->recv_rtp_src); priv->recv_rtp_src = NULL; } for (i = 0; i < 2; i++) { clear_element (bin, &priv->udpsrc_v4[i]); clear_element (bin, &priv->udpsrc_v6[i]); clear_element (bin, &priv->udpqueue[i]); clear_element (bin, &priv->udpsink[i]); clear_element (bin, &priv->mcast_udpsrc_v4[i]); clear_element (bin, &priv->mcast_udpsrc_v6[i]); clear_element (bin, &priv->mcast_udpqueue[i]); clear_element (bin, &priv->mcast_udpsink[i]); clear_element (bin, &priv->appsrc[i]); clear_element (bin, &priv->appqueue[i]); clear_element (bin, &priv->appsink[i]); clear_element (bin, &priv->tee[i]); clear_element (bin, &priv->funnel[i]); if (priv->sinkpad || i == 1) { gst_element_release_request_pad (rtpbin, priv->recv_sink[i]); gst_object_unref (priv->recv_sink[i]); priv->recv_sink[i] = NULL; } } if (priv->srcpad) { gst_object_unref (priv->send_src[0]); priv->send_src[0] = NULL; } gst_element_release_request_pad (rtpbin, priv->send_src[1]); gst_object_unref (priv->send_src[1]); priv->send_src[1] = NULL; g_object_unref (priv->session); priv->session = NULL; if (priv->caps) gst_caps_unref (priv->caps); priv->caps = NULL; if (priv->srtpenc) gst_object_unref (priv->srtpenc); if (priv->srtpdec) gst_object_unref (priv->srtpdec); if (priv->mcast_addr_v4) gst_rtsp_address_free (priv->mcast_addr_v4); priv->mcast_addr_v4 = NULL; if (priv->mcast_addr_v6) gst_rtsp_address_free (priv->mcast_addr_v6); priv->mcast_addr_v6 = NULL; if (priv->server_addr_v4) gst_rtsp_address_free (priv->server_addr_v4); priv->server_addr_v4 = NULL; if (priv->server_addr_v6) gst_rtsp_address_free (priv->server_addr_v6); priv->server_addr_v6 = NULL; g_mutex_unlock (&priv->lock); return TRUE; was_not_joined: { g_mutex_unlock (&priv->lock); return TRUE; } transports_not_removed: { GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)"); g_mutex_unlock (&priv->lock); return FALSE; } wrong_bin: { GST_ERROR_OBJECT (stream, "leaving the wrong bin"); g_mutex_unlock (&priv->lock); return FALSE; } } /** * gst_rtsp_stream_get_joined_bin: * @stream: a #GstRTSPStream * * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL. * * Return: (transfer full): the joined bin or NULL. */ GstBin * gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstBin *bin = NULL; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL; g_mutex_unlock (&priv->lock); return bin; } /** * gst_rtsp_stream_get_rtpinfo: * @stream: a #GstRTSPStream * @rtptime: (allow-none): result RTP timestamp * @seq: (allow-none): result RTP seqnum * @clock_rate: (allow-none): the clock rate * @running_time: result running-time * * Retrieve the current rtptime, seq and running-time. This is used to * construct a RTPInfo reply header. * * Returns: %TRUE when rtptime, seq and running-time could be determined. */ gboolean gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream, guint * rtptime, guint * seq, guint * clock_rate, GstClockTime * running_time) { GstRTSPStreamPrivate *priv; GstStructure *stats; GObjectClass *payobjclass; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; payobjclass = G_OBJECT_GET_CLASS (priv->payloader); g_mutex_lock (&priv->lock); /* First try to extract the information from the last buffer on the sinks. * This will have a more accurate sequence number and timestamp, as between * the payloader and the sink there can be some queues */ if (priv->udpsink[0] || priv->appsink[0]) { GstSample *last_sample; if (priv->udpsink[0]) g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL); else g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL); if (last_sample) { GstCaps *caps; GstBuffer *buffer; GstSegment *segment; GstStructure *s; GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT; caps = gst_sample_get_caps (last_sample); buffer = gst_sample_get_buffer (last_sample); segment = gst_sample_get_segment (last_sample); s = gst_caps_get_structure (caps, 0); if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) { guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer); guint ssrc_stream = 0; if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) && gst_structure_get_uint (s, "ssrc", &ssrc_stream) && ssrc_buf != ssrc_stream) { /* Skip buffers from auxiliary streams. */ GST_DEBUG_OBJECT (stream, "not a buffer from the payloader, SSRC: %08x", ssrc_buf); gst_rtp_buffer_unmap (&rtp_buffer); gst_sample_unref (last_sample); goto stats; } if (seq) { *seq = gst_rtp_buffer_get_seq (&rtp_buffer); } if (rtptime) { *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer); } gst_rtp_buffer_unmap (&rtp_buffer); if (running_time) { *running_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buffer)); } if (clock_rate) { gst_structure_get_int (s, "clock-rate", (gint *) clock_rate); if (*clock_rate == 0 && running_time) *running_time = GST_CLOCK_TIME_NONE; } gst_sample_unref (last_sample); goto done; } else { gst_sample_unref (last_sample); } } } stats: if (g_object_class_find_property (payobjclass, "stats")) { g_object_get (priv->payloader, "stats", &stats, NULL); if (stats == NULL) goto no_stats; if (seq) gst_structure_get_uint (stats, "seqnum", seq); if (rtptime) gst_structure_get_uint (stats, "timestamp", rtptime); if (running_time) gst_structure_get_clock_time (stats, "running-time", running_time); if (clock_rate) { gst_structure_get_uint (stats, "clock-rate", clock_rate); if (*clock_rate == 0 && running_time) *running_time = GST_CLOCK_TIME_NONE; } gst_structure_free (stats); } else { if (!g_object_class_find_property (payobjclass, "seqnum") || !g_object_class_find_property (payobjclass, "timestamp")) goto no_stats; if (seq) g_object_get (priv->payloader, "seqnum", seq, NULL); if (rtptime) g_object_get (priv->payloader, "timestamp", rtptime, NULL); if (running_time) *running_time = GST_CLOCK_TIME_NONE; } done: g_mutex_unlock (&priv->lock); return TRUE; /* ERRORS */ no_stats: { GST_WARNING ("Could not get payloader stats"); g_mutex_unlock (&priv->lock); return FALSE; } } /** * gst_rtsp_stream_get_caps: * @stream: a #GstRTSPStream * * Retrieve the current caps of @stream. * * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref() * after usage. */ GstCaps * gst_rtsp_stream_get_caps (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstCaps *result; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; g_mutex_lock (&priv->lock); if ((result = priv->caps)) gst_caps_ref (result); g_mutex_unlock (&priv->lock); return result; } /** * gst_rtsp_stream_recv_rtp: * @stream: a #GstRTSPStream * @buffer: (transfer full): a #GstBuffer * * Handle an RTP buffer for the stream. This method is usually called when a * message has been received from a client using the TCP transport. * * This function takes ownership of @buffer. * * Returns: a GstFlowReturn. */ GstFlowReturn gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer) { GstRTSPStreamPrivate *priv; GstFlowReturn ret; GstElement *element; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); priv = stream->priv; g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); g_return_val_if_fail (priv->joined_bin != NULL, FALSE); g_mutex_lock (&priv->lock); if (priv->appsrc[0]) element = gst_object_ref (priv->appsrc[0]); else element = NULL; g_mutex_unlock (&priv->lock); if (element) { if (priv->appsrc_base_time[0] == -1) { /* Take current running_time. This timestamp will be put on * the first buffer of each stream because we are a live source and so we * timestamp with the running_time. When we are dealing with TCP, we also * only timestamp the first buffer (using the DISCONT flag) because a server * typically bursts data, for which we don't want to compensate by speeding * up the media. The other timestamps will be interpollated from this one * using the RTP timestamps. */ GST_OBJECT_LOCK (element); if (GST_ELEMENT_CLOCK (element)) { GstClockTime now; GstClockTime base_time; now = gst_clock_get_time (GST_ELEMENT_CLOCK (element)); base_time = GST_ELEMENT_CAST (element)->base_time; priv->appsrc_base_time[0] = now - base_time; GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0]; GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now), GST_TIME_ARGS (base_time)); } GST_OBJECT_UNLOCK (element); } ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); gst_object_unref (element); } else { ret = GST_FLOW_OK; } return ret; } /** * gst_rtsp_stream_recv_rtcp: * @stream: a #GstRTSPStream * @buffer: (transfer full): a #GstBuffer * * Handle an RTCP buffer for the stream. This method is usually called when a * message has been received from a client using the TCP transport. * * This function takes ownership of @buffer. * * Returns: a GstFlowReturn. */ GstFlowReturn gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer) { GstRTSPStreamPrivate *priv; GstFlowReturn ret; GstElement *element; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR); priv = stream->priv; g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); if (priv->joined_bin == NULL) { gst_buffer_unref (buffer); return GST_FLOW_NOT_LINKED; } g_mutex_lock (&priv->lock); if (priv->appsrc[1]) element = gst_object_ref (priv->appsrc[1]); else element = NULL; g_mutex_unlock (&priv->lock); if (element) { if (priv->appsrc_base_time[1] == -1) { /* Take current running_time. This timestamp will be put on * the first buffer of each stream because we are a live source and so we * timestamp with the running_time. When we are dealing with TCP, we also * only timestamp the first buffer (using the DISCONT flag) because a server * typically bursts data, for which we don't want to compensate by speeding * up the media. The other timestamps will be interpollated from this one * using the RTP timestamps. */ GST_OBJECT_LOCK (element); if (GST_ELEMENT_CLOCK (element)) { GstClockTime now; GstClockTime base_time; now = gst_clock_get_time (GST_ELEMENT_CLOCK (element)); base_time = GST_ELEMENT_CAST (element)->base_time; priv->appsrc_base_time[1] = now - base_time; GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1]; GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now), GST_TIME_ARGS (base_time)); } GST_OBJECT_UNLOCK (element); } ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer); gst_object_unref (element); } else { ret = GST_FLOW_OK; gst_buffer_unref (buffer); } return ret; } /* must be called with lock */ static gboolean update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans, gboolean add) { GstRTSPStreamPrivate *priv = stream->priv; const GstRTSPTransport *tr; tr = gst_rtsp_stream_transport_get_transport (trans); switch (tr->lower_transport) { case GST_RTSP_LOWER_TRANS_UDP_MCAST: { if (add) { if (!check_mcast_part_for_transport (stream, tr)) goto mcast_error; priv->transports = g_list_prepend (priv->transports, trans); } else { priv->transports = g_list_remove (priv->transports, trans); } break; } case GST_RTSP_LOWER_TRANS_UDP: { gchar *dest; gint min, max; guint ttl = 0; dest = tr->destination; if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { min = tr->port.min; max = tr->port.max; ttl = tr->ttl; } else if (priv->client_side) { /* In client side mode the 'destination' is the RTSP server, so send * to those ports */ min = tr->server_port.min; max = tr->server_port.max; } else { min = tr->client_port.min; max = tr->client_port.max; } if (add) { if (ttl > 0) { GST_INFO ("setting ttl-mc %d", ttl); if (priv->udpsink[0]) g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL); g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL); } GST_INFO ("adding %s:%d-%d", dest, min, max); if (priv->udpsink[0]) g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL); g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL); priv->transports = g_list_prepend (priv->transports, trans); } else { GST_INFO ("removing %s:%d-%d", dest, min, max); if (priv->udpsink[0]) g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL); g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL); priv->transports = g_list_remove (priv->transports, trans); } priv->transports_cookie++; break; } case GST_RTSP_LOWER_TRANS_TCP: if (add) { GST_INFO ("adding TCP %s", tr->destination); priv->transports = g_list_prepend (priv->transports, trans); } else { GST_INFO ("removing TCP %s", tr->destination); priv->transports = g_list_remove (priv->transports, trans); } priv->transports_cookie++; break; default: goto unknown_transport; } return TRUE; /* ERRORS */ unknown_transport: { GST_INFO ("Unknown transport %d", tr->lower_transport); return FALSE; } mcast_error: { return FALSE; } } /** * gst_rtsp_stream_add_transport: * @stream: a #GstRTSPStream * @trans: (transfer none): a #GstRTSPStreamTransport * * Add the transport in @trans to @stream. The media of @stream will * then also be send to the values configured in @trans. * * @stream must be joined to a bin. * * @trans must contain a valid #GstRTSPTransport. * * Returns: %TRUE if @trans was added */ gboolean gst_rtsp_stream_add_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans) { GstRTSPStreamPrivate *priv; gboolean res; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); g_return_val_if_fail (priv->joined_bin != NULL, FALSE); g_mutex_lock (&priv->lock); res = update_transport (stream, trans, TRUE); g_mutex_unlock (&priv->lock); return res; } /** * gst_rtsp_stream_remove_transport: * @stream: a #GstRTSPStream * @trans: (transfer none): a #GstRTSPStreamTransport * * Remove the transport in @trans from @stream. The media of @stream will * not be sent to the values configured in @trans. * * @stream must be joined to a bin. * * @trans must contain a valid #GstRTSPTransport. * * Returns: %TRUE if @trans was removed */ gboolean gst_rtsp_stream_remove_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans) { GstRTSPStreamPrivate *priv; gboolean res; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); g_return_val_if_fail (priv->joined_bin != NULL, FALSE); g_mutex_lock (&priv->lock); res = update_transport (stream, trans, FALSE); g_mutex_unlock (&priv->lock); return res; } /** * gst_rtsp_stream_update_crypto: * @stream: a #GstRTSPStream * @ssrc: the SSRC * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info * * Update the new crypto information for @ssrc in @stream. If information * for @ssrc did not exist, it will be added. If information * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will * be removed from @stream. * * Returns: %TRUE if @crypto could be updated */ gboolean gst_rtsp_stream_update_crypto (GstRTSPStream * stream, guint ssrc, GstCaps * crypto) { GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE); priv = stream->priv; GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc); g_mutex_lock (&priv->lock); if (crypto) g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc), gst_caps_ref (crypto)); else g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc)); g_mutex_unlock (&priv->lock); return TRUE; } /** * gst_rtsp_stream_get_rtp_socket: * @stream: a #GstRTSPStream * @family: the socket family * * Get the RTP socket from @stream for a @family. * * @stream must be joined to a bin. * * Returns: (transfer full) (nullable): the RTP socket or %NULL if no * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream); GSocket *socket; const gchar *name; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 || family == G_SOCKET_FAMILY_IPV6, NULL); g_return_val_if_fail (priv->udpsink[0], NULL); if (family == G_SOCKET_FAMILY_IPV6) name = "socket-v6"; else name = "socket"; g_object_get (priv->udpsink[0], name, &socket, NULL); return socket; } /** * gst_rtsp_stream_get_rtcp_socket: * @stream: a #GstRTSPStream * @family: the socket family * * Get the RTCP socket from @stream for a @family. * * @stream must be joined to a bin. * * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream); GSocket *socket; const gchar *name; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 || family == G_SOCKET_FAMILY_IPV6, NULL); g_return_val_if_fail (priv->udpsink[1], NULL); if (family == G_SOCKET_FAMILY_IPV6) name = "socket-v6"; else name = "socket"; g_object_get (priv->udpsink[1], name, &socket, NULL); return socket; } /** * gst_rtsp_stream_get_rtp_multicast_socket: * @stream: a #GstRTSPStream * @family: the socket family * * Get the multicast RTP socket from @stream for a @family. * * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream); GSocket *socket; const gchar *name; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 || family == G_SOCKET_FAMILY_IPV6, NULL); g_return_val_if_fail (priv->mcast_udpsink[0], NULL); if (family == G_SOCKET_FAMILY_IPV6) name = "socket-v6"; else name = "socket"; g_object_get (priv->mcast_udpsink[0], name, &socket, NULL); return socket; } /** * gst_rtsp_stream_get_rtcp_multicast_socket: * @stream: a #GstRTSPStream * @family: the socket family * * Get the multicast RTCP socket from @stream for a @family. * * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no * socket could be allocated for @family. Unref after usage */ GSocket * gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream, GSocketFamily family) { GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream); GSocket *socket; const gchar *name; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 || family == G_SOCKET_FAMILY_IPV6, NULL); g_return_val_if_fail (priv->mcast_udpsink[1], NULL); if (family == G_SOCKET_FAMILY_IPV6) name = "socket-v6"; else name = "socket"; g_object_get (priv->mcast_udpsink[1], name, &socket, NULL); return socket; } /** * gst_rtsp_stream_set_seqnum: * @stream: a #GstRTSPStream * @seqnum: a new sequence number * * Configure the sequence number in the payloader of @stream to @seqnum. */ void gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum) { GstRTSPStreamPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM (stream)); priv = stream->priv; g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL); } /** * gst_rtsp_stream_get_seqnum: * @stream: a #GstRTSPStream * * Get the configured sequence number in the payloader of @stream. * * Returns: the sequence number of the payloader. */ guint16 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; guint seqnum; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0); priv = stream->priv; g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL); return seqnum; } /** * gst_rtsp_stream_transport_filter: * @stream: a #GstRTSPStream * @func: (scope call) (allow-none): a callback * @user_data: (closure): user data passed to @func * * Call @func for each transport managed by @stream. The result value of @func * determines what happens to the transport. @func will be called with @stream * locked so no further actions on @stream can be performed from @func. * * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from * @stream. * * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream. * * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but * will also be added with an additional ref to the result #GList of this * function.. * * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport. * * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each * element in the #GList should be unreffed before the list is freed. */ GList * gst_rtsp_stream_transport_filter (GstRTSPStream * stream, GstRTSPStreamTransportFilterFunc func, gpointer user_data) { GstRTSPStreamPrivate *priv; GList *result, *walk, *next; GHashTable *visited = NULL; guint cookie; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); priv = stream->priv; result = NULL; if (func) visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); restart: cookie = priv->transports_cookie; for (walk = priv->transports; walk; walk = next) { GstRTSPStreamTransport *trans = walk->data; GstRTSPFilterResult res; gboolean changed; next = g_list_next (walk); if (func) { /* only visit each transport once */ if (g_hash_table_contains (visited, trans)) continue; g_hash_table_add (visited, g_object_ref (trans)); g_mutex_unlock (&priv->lock); res = func (stream, trans, user_data); g_mutex_lock (&priv->lock); } else res = GST_RTSP_FILTER_REF; changed = (cookie != priv->transports_cookie); switch (res) { case GST_RTSP_FILTER_REMOVE: update_transport (stream, trans, FALSE); break; case GST_RTSP_FILTER_REF: result = g_list_prepend (result, g_object_ref (trans)); break; case GST_RTSP_FILTER_KEEP: default: break; } if (changed) goto restart; } g_mutex_unlock (&priv->lock); if (func) g_hash_table_unref (visited); return result; } static GstPadProbeReturn pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstRTSPStreamPrivate *priv; GstRTSPStream *stream; GstBuffer *buffer = NULL; stream = user_data; priv = stream->priv; GST_DEBUG_OBJECT (pad, "now blocking"); g_mutex_lock (&priv->lock); priv->blocking = TRUE; if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) { buffer = gst_pad_probe_info_get_buffer (info); } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) { GstBufferList *list = gst_pad_probe_info_get_buffer_list (info); buffer = gst_buffer_list_get (list, 0); } else { g_assert_not_reached (); } g_assert (buffer); priv->position = GST_BUFFER_TIMESTAMP (buffer); GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); g_mutex_unlock (&priv->lock); gst_element_post_message (priv->payloader, gst_message_new_element (GST_OBJECT_CAST (priv->payloader), gst_structure_new_empty ("GstRTSPStreamBlocking"))); return GST_PAD_PROBE_OK; } static void set_blocked (GstRTSPStream * stream, gboolean blocked) { GstRTSPStreamPrivate *priv; int i; GST_DEBUG_OBJECT (stream, "blocked: %d", blocked); priv = stream->priv; if (blocked) { for (i = 0; i < 2; i++) { if (priv->blocked_id[i] != 0) continue; if (priv->send_src[i]) { priv->blocking = FALSE; priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i], GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking, g_object_ref (stream), g_object_unref); } } } else { for (i = 0; i < 2; i++) { if (priv->blocked_id[i] != 0) { gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]); priv->blocked_id[i] = 0; } } priv->blocking = FALSE; } } /** * gst_rtsp_stream_set_blocked: * @stream: a #GstRTSPStream * @blocked: boolean indicating we should block or unblock * * Blocks or unblocks the dataflow on @stream. * * Returns: %TRUE on success */ gboolean gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked) { GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); set_blocked (stream, blocked); g_mutex_unlock (&priv->lock); return TRUE; } /** * gst_rtsp_stream_ublock_linked: * @stream: a #GstRTSPStream * * Unblocks the dataflow on @stream if it is linked. * * Returns: %TRUE on success */ gboolean gst_rtsp_stream_unblock_linked (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0])) set_blocked (stream, FALSE); g_mutex_unlock (&priv->lock); return TRUE; } /** * gst_rtsp_stream_is_blocking: * @stream: a #GstRTSPStream * * Check if @stream is blocking on a #GstBuffer. * * Returns: %TRUE if @stream is blocking */ gboolean gst_rtsp_stream_is_blocking (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; gboolean result; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); result = priv->blocking; g_mutex_unlock (&priv->lock); return result; } /** * gst_rtsp_stream_query_position: * @stream: a #GstRTSPStream * @position: current position of a #GstRTSPStream * * Query the position of the stream in %GST_FORMAT_TIME. This only considers * the RTP parts of the pipeline and not the RTCP parts. * * Returns: %TRUE if the position could be queried */ gboolean gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position) { GstRTSPStreamPrivate *priv; GstElement *sink; GstPad *pad = NULL; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); /* query position: if no sinks have been added yet, * we obtain the position from the pad otherwise we query the sinks */ priv = stream->priv; g_mutex_lock (&priv->lock); /* depending on the transport type, it should query corresponding sink */ if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) sink = priv->udpsink[0]; else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) sink = priv->mcast_udpsink[0]; else sink = priv->appsink[0]; if (sink) { gst_object_ref (sink); } else if (priv->send_src[0]) { pad = gst_object_ref (priv->send_src[0]); } else { g_mutex_unlock (&priv->lock); GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline"); return FALSE; } g_mutex_unlock (&priv->lock); if (sink) { if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) { GST_WARNING_OBJECT (stream, "Couldn't obtain postion: position query failed"); gst_object_unref (sink); return FALSE; } gst_object_unref (sink); } else if (pad) { GstEvent *event; const GstSegment *segment; event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0); if (!event) { GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event"); gst_object_unref (pad); return FALSE; } gst_event_parse_segment (event, &segment); if (segment->format != GST_FORMAT_TIME) { *position = -1; } else { g_mutex_lock (&priv->lock); *position = priv->position; g_mutex_unlock (&priv->lock); *position = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position); } gst_event_unref (event); gst_object_unref (pad); } return TRUE; } /** * gst_rtsp_stream_query_stop: * @stream: a #GstRTSPStream * @stop: current stop of a #GstRTSPStream * * Query the stop of the stream in %GST_FORMAT_TIME. This only considers * the RTP parts of the pipeline and not the RTCP parts. * * Returns: %TRUE if the stop could be queried */ gboolean gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop) { GstRTSPStreamPrivate *priv; GstElement *sink; GstPad *pad = NULL; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); /* query stop position: if no sinks have been added yet, * we obtain the stop position from the pad otherwise we query the sinks */ priv = stream->priv; g_mutex_lock (&priv->lock); /* depending on the transport type, it should query corresponding sink */ if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) sink = priv->udpsink[0]; else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) sink = priv->mcast_udpsink[0]; else sink = priv->appsink[0]; if (sink) { gst_object_ref (sink); } else if (priv->send_src[0]) { pad = gst_object_ref (priv->send_src[0]); } else { g_mutex_unlock (&priv->lock); GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline"); return FALSE; } g_mutex_unlock (&priv->lock); if (sink) { GstQuery *query; GstFormat format; query = gst_query_new_segment (GST_FORMAT_TIME); if (!gst_element_query (sink, query)) { GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed"); gst_query_unref (query); gst_object_unref (sink); return FALSE; } gst_query_parse_segment (query, NULL, &format, NULL, stop); if (format != GST_FORMAT_TIME) *stop = -1; gst_query_unref (query); gst_object_unref (sink); } else if (pad) { GstEvent *event; const GstSegment *segment; event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0); if (!event) { GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event"); gst_object_unref (pad); return FALSE; } gst_event_parse_segment (event, &segment); if (segment->format != GST_FORMAT_TIME) { *stop = -1; } else { *stop = segment->stop; if (*stop == -1) *stop = segment->duration; else *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop); } gst_event_unref (event); gst_object_unref (pad); } return TRUE; } /** * gst_rtsp_stream_seekable: * @stream: a #GstRTSPStream * * Checks whether the individual @stream is seekable. * * Returns: %TRUE if @stream is seekable, else %FALSE. */ gboolean gst_rtsp_stream_seekable (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; GstPad *pad = NULL; GstQuery *query = NULL; gboolean seekable = FALSE; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); /* query stop position: if no sinks have been added yet, * we obtain the stop position from the pad otherwise we query the sinks */ priv = stream->priv; g_mutex_lock (&priv->lock); /* depending on the transport type, it should query corresponding sink */ if (priv->srcpad) { pad = gst_object_ref (priv->srcpad); } else { g_mutex_unlock (&priv->lock); GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability"); goto beach; } g_mutex_unlock (&priv->lock); query = gst_query_new_seeking (GST_FORMAT_TIME); if (!gst_pad_query (pad, query)) { GST_WARNING_OBJECT (stream, "seeking query failed"); goto beach; } gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL); beach: if (pad) gst_object_unref (pad); if (query) gst_query_unref (query); GST_DEBUG_OBJECT (stream, "Returning %d", seekable); return seekable; } /** * gst_rtsp_stream_complete_stream: * @stream: a #GstRTSPStream * @transport: a #GstRTSPTransport * * Add a receiver and sender part to the pipeline based on the transport from * SETUP. * * Returns: %TRUE if the stream has been sucessfully updated. */ gboolean gst_rtsp_stream_complete_stream (GstRTSPStream * stream, const GstRTSPTransport * transport) { GstRTSPStreamPrivate *priv; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; GST_DEBUG_OBJECT (stream, "complete stream"); g_mutex_lock (&priv->lock); if (!(priv->protocols & transport->lower_transport)) goto unallowed_transport; if (!create_receiver_part (stream, transport)) goto create_receiver_error; /* in the RECORD case, we only add RTCP sender part */ if (!create_sender_part (stream, transport)) goto create_sender_error; priv->is_complete = TRUE; g_mutex_unlock (&priv->lock); GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated"); return TRUE; create_receiver_error: create_sender_error: unallowed_transport: { g_mutex_unlock (&priv->lock); return FALSE; } } /** * gst_rtsp_stream_is_complete: * @stream: a #GstRTSPStream * * Checks whether the stream is complete, contains the receiver and the sender * parts. As the stream contains sink(s) element(s), it's possible to perform * seek operations on it. * * Returns: %TRUE if the stream contains at least one sink element. */ gboolean gst_rtsp_stream_is_complete (GstRTSPStream * stream) { GstRTSPStreamPrivate *priv; gboolean ret = FALSE; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE); priv = stream->priv; g_mutex_lock (&priv->lock); ret = priv->is_complete; g_mutex_unlock (&priv->lock); return ret; }