diff --git a/gst/rtsp-server/rtsp-media-factory.h b/gst/rtsp-server/rtsp-media-factory.h index f44192c590..ef3f6e8103 100644 --- a/gst/rtsp-server/rtsp-media-factory.h +++ b/gst/rtsp-server/rtsp-media-factory.h @@ -77,7 +77,6 @@ struct _GstRTSPMediaFactory { * pay%d to create the streams. * @configure: configure the media created with @construct. The default * implementation will configure the 'shared' property of the media. - * @handle_message: Handle a bus message for @media created from @factory. * * The #GstRTSPMediaFactory class structure. */ @@ -89,9 +88,6 @@ struct _GstRTSPMediaFactoryClass { GstElement * (*get_element) (GstRTSPMediaFactory *factory, const GstRTSPUrl *url); GstRTSPMedia * (*construct) (GstRTSPMediaFactory *factory, const GstRTSPUrl *url); void (*configure) (GstRTSPMediaFactory *factory, GstRTSPMedia *media); - - void (*handle_message) (GstRTSPMediaFactory *factory, GstRTSPMedia *media, - GstMessage *message); }; GType gst_rtsp_media_factory_get_type (void); diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index d0f8ba0397..2768a808a9 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -34,12 +34,16 @@ static void gst_rtsp_media_set_property (GObject *object, guint propid, const GValue *value, GParamSpec *pspec); static void gst_rtsp_media_finalize (GObject * obj); +static gpointer do_loop (GstRTSPMediaClass *klass); +static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message); + G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT); static void gst_rtsp_media_class_init (GstRTSPMediaClass * klass) { GObjectClass *gobject_class; + GError *error = NULL; gobject_class = G_OBJECT_CLASS (klass); @@ -50,6 +54,15 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass) g_object_class_install_property (gobject_class, PROP_SHARED, g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared", DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + klass->context = g_main_context_new (); + klass->loop = g_main_loop_new (klass->context, TRUE); + + klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error); + if (error != NULL) { + g_critical ("could not start bus thread: %s", error->message); + } + klass->handle_message = default_handle_message; } static void @@ -57,6 +70,8 @@ gst_rtsp_media_init (GstRTSPMedia * media) { media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *)); media->complete = FALSE; + media->is_live = FALSE; + media->buffering = FALSE; } static void @@ -87,6 +102,11 @@ gst_rtsp_media_finalize (GObject * obj) } g_array_free (media->streams, TRUE); + if (media->source) { + g_source_destroy (media->source); + g_source_unref (media->source); + } + if (media->pipeline) gst_object_unref (media->pipeline); @@ -123,6 +143,16 @@ gst_rtsp_media_set_property (GObject *object, guint propid, } } +static gpointer +do_loop (GstRTSPMediaClass *klass) +{ + g_message ("enter mainloop"); + g_main_loop_run (klass->loop); + g_message ("exit mainloop"); + + return NULL; +} + /** * gst_rtsp_media_new: * @@ -449,6 +479,23 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) return TRUE; } +static void +unlock_streams (GstRTSPMedia *media) +{ + guint i, n_streams; + + /* unlock the udp src elements */ + n_streams = gst_rtsp_media_n_streams (media); + for (i = 0; i < n_streams; i++) { + GstRTSPMediaStream *stream; + + stream = gst_rtsp_media_get_stream (media, i); + + gst_element_set_locked_state (stream->udpsrc[0], FALSE); + gst_element_set_locked_state (stream->udpsrc[1], FALSE); + } +} + static void collect_media_stats (GstRTSPMedia *media) { @@ -474,6 +521,92 @@ collect_media_stats (GstRTSPMedia *media) } } +static gboolean +default_handle_message (GstRTSPMedia *media, GstMessage *message) +{ + GstMessageType type; + + type = GST_MESSAGE_TYPE (message); + + switch (type) { + case GST_MESSAGE_STATE_CHANGED: + break; + case GST_MESSAGE_BUFFERING: + { + gint percent; + + gst_message_parse_buffering (message, &percent); + + /* no state management needed for live pipelines */ + if (media->is_live) + break; + + if (percent == 100) { + /* a 100% message means buffering is done */ + media->buffering = FALSE; + /* if the desired state is playing, go back */ + if (media->target_state == GST_STATE_PLAYING) { + g_message ("Buffering done, setting pipeline to PLAYING"); + gst_element_set_state (media->pipeline, GST_STATE_PLAYING); + } + else { + g_message ("Buffering done"); + } + } else { + /* buffering busy */ + if (media->buffering == FALSE) { + if (media->target_state == GST_STATE_PLAYING) { + /* we were not buffering but PLAYING, PAUSE the pipeline. */ + g_message ("Buffering, setting pipeline to PAUSED ..."); + gst_element_set_state (media->pipeline, GST_STATE_PAUSED); + } + else { + g_message ("Buffering ..."); + } + } + media->buffering = TRUE; + } + break; + } + case GST_MESSAGE_LATENCY: + { + gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline)); + break; + } + case GST_MESSAGE_ERROR: + { + GError *gerror; + gchar *debug; + + gst_message_parse_error (message, &gerror, &debug); + g_warning ("%p: got error %s (%s)", media, gerror->message, debug); + g_error_free (gerror); + g_free (debug); + break; + } + default: + g_message ("%p: got message type %s", media, gst_message_type_get_name (type)); + break; + } + return TRUE; +} + +static gboolean +bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media) +{ + GstRTSPMediaClass *klass; + gboolean ret; + + klass = GST_RTSP_MEDIA_GET_CLASS (media); + + if (klass->handle_message) + ret = klass->handle_message (media, message); + else + ret = FALSE; + + return ret; +} + /** * gst_rtsp_media_prepare: * @obj: a #GstRTSPMedia @@ -488,6 +621,8 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) { GstStateChangeReturn ret; guint i, n_streams; + GstRTSPMediaClass *klass; + GstBus *bus; if (media->prepared) goto was_prepared; @@ -495,6 +630,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) g_message ("preparing media %p", media); media->pipeline = gst_pipeline_new ("media-pipeline"); + bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline)); gst_bin_add (GST_BIN_CAST (media->pipeline), media->element); @@ -515,6 +651,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) /* first go to PAUSED */ ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED); + media->target_state = GST_STATE_PAUSED; switch (ret) { case GST_STATE_CHANGE_SUCCESS: @@ -524,6 +661,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) case GST_STATE_CHANGE_NO_PREROLL: /* we need to go to PLAYING */ g_message ("live media %p", media); + media->is_live = TRUE; ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); break; case GST_STATE_CHANGE_FAILURE: @@ -539,18 +677,21 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) /* collect stats about the media */ collect_media_stats (media); - /* unlock the udp src elements */ - n_streams = gst_rtsp_media_n_streams (media); - for (i = 0; i < n_streams; i++) { - GstRTSPMediaStream *stream; - - stream = gst_rtsp_media_get_stream (media, i); - - gst_element_set_locked_state (stream->udpsrc[0], FALSE); - gst_element_set_locked_state (stream->udpsrc[1], FALSE); - } + /* unlock the streams so that they follow the state changes from now on */ + unlock_streams (media); g_message ("object %p is prerolled", media); + + /* add the pipeline bus to our custom mainloop */ + bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline)); + media->source = gst_bus_create_watch (bus); + gst_object_unref (bus); + + g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL); + + klass = GST_RTSP_MEDIA_GET_CLASS (media); + media->id = g_source_attach (media->source, klass->context); + media->prepared = TRUE; return TRUE; @@ -563,7 +704,33 @@ was_prepared: /* ERRORS */ state_failed: { + GstMessage *message; + g_message ("state change failed for media %p", media); + while ((message = gst_bus_pop (bus))) { + GstMessageType type; + + type = GST_MESSAGE_TYPE (message); + switch (type) { + case GST_MESSAGE_ERROR: + { + GError *gerror; + gchar *debug; + + gst_message_parse_error (message, &gerror, &debug); + g_warning ("%p: got error %s (%s)", media, gerror->message, debug); + g_error_free (gerror); + g_free (debug); + break; + } + default: + break; + } + gst_message_unref (message); + } + unlock_streams (media); + gst_element_set_state (media->pipeline, GST_STATE_NULL); + gst_object_unref (bus); return FALSE; } } @@ -611,6 +778,7 @@ gst_rtsp_media_play (GstRTSPMedia *media, GArray *transports) } g_message ("playing"); + media->target_state = GST_STATE_PLAYING; ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); return TRUE; @@ -659,6 +827,7 @@ gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports) } g_message ("pause"); + media->target_state = GST_STATE_PAUSED; ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED); return TRUE; @@ -685,6 +854,7 @@ gst_rtsp_media_stop (GstRTSPMedia *media, GArray *transports) gst_rtsp_media_pause (media, transports); g_message ("stop"); + media->target_state = GST_STATE_NULL; ret = gst_element_set_state (media->pipeline, GST_STATE_NULL); return TRUE; diff --git a/gst/rtsp-server/rtsp-media.h b/gst/rtsp-server/rtsp-media.h index c00b165476..798ceef486 100644 --- a/gst/rtsp-server/rtsp-media.h +++ b/gst/rtsp-server/rtsp-media.h @@ -124,6 +124,12 @@ struct _GstRTSPMedia { /* the pipeline for the media */ GstElement *pipeline; + GSource *source; + guint id; + + gboolean is_live; + gboolean buffering; + GstState target_state; /* RTP session manager */ GstElement *rtpbin; @@ -135,8 +141,24 @@ struct _GstRTSPMedia { GstRTSPTimeRange range; }; +/** + * GstRTSPMediaClass: + * @context: the main context for dispatching messages + * @loop: the mainloop for message. + * @thread: the thread dispatching messages. + * @handle_message: handle a message + * + * The RTSP media class + */ struct _GstRTSPMediaClass { GObjectClass parent_class; + + /* thread for the mainloop */ + GMainContext *context; + GMainLoop *loop; + GThread *thread; + + gboolean (*handle_message) (GstRTSPMedia *media, GstMessage *message); }; GType gst_rtsp_media_get_type (void);