Handle media bus messages

Handle media bus messages in a custom mainloop and dispatch them to the
RTSPMedia objects. Let the default implementation handle some common messages.
This commit is contained in:
Wim Taymans 2009-02-13 16:39:36 +01:00
parent e1154c92d6
commit cd29e2a454
3 changed files with 202 additions and 14 deletions

View file

@ -77,7 +77,6 @@ struct _GstRTSPMediaFactory {
* pay%d to create the streams.
* @configure: configure the media created with @construct. The default
* implementation will configure the 'shared' property of the media.
* @handle_message: Handle a bus message for @media created from @factory.
*
* The #GstRTSPMediaFactory class structure.
*/
@ -89,9 +88,6 @@ struct _GstRTSPMediaFactoryClass {
GstElement * (*get_element) (GstRTSPMediaFactory *factory, const GstRTSPUrl *url);
GstRTSPMedia * (*construct) (GstRTSPMediaFactory *factory, const GstRTSPUrl *url);
void (*configure) (GstRTSPMediaFactory *factory, GstRTSPMedia *media);
void (*handle_message) (GstRTSPMediaFactory *factory, GstRTSPMedia *media,
GstMessage *message);
};
GType gst_rtsp_media_factory_get_type (void);

View file

@ -34,12 +34,16 @@ static void gst_rtsp_media_set_property (GObject *object, guint propid,
const GValue *value, GParamSpec *pspec);
static void gst_rtsp_media_finalize (GObject * obj);
static gpointer do_loop (GstRTSPMediaClass *klass);
static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
static void
gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
{
GObjectClass *gobject_class;
GError *error = NULL;
gobject_class = G_OBJECT_CLASS (klass);
@ -50,6 +54,15 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
g_object_class_install_property (gobject_class, PROP_SHARED,
g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
klass->context = g_main_context_new ();
klass->loop = g_main_loop_new (klass->context, TRUE);
klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
if (error != NULL) {
g_critical ("could not start bus thread: %s", error->message);
}
klass->handle_message = default_handle_message;
}
static void
@ -57,6 +70,8 @@ gst_rtsp_media_init (GstRTSPMedia * media)
{
media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
media->complete = FALSE;
media->is_live = FALSE;
media->buffering = FALSE;
}
static void
@ -87,6 +102,11 @@ gst_rtsp_media_finalize (GObject * obj)
}
g_array_free (media->streams, TRUE);
if (media->source) {
g_source_destroy (media->source);
g_source_unref (media->source);
}
if (media->pipeline)
gst_object_unref (media->pipeline);
@ -123,6 +143,16 @@ gst_rtsp_media_set_property (GObject *object, guint propid,
}
}
static gpointer
do_loop (GstRTSPMediaClass *klass)
{
g_message ("enter mainloop");
g_main_loop_run (klass->loop);
g_message ("exit mainloop");
return NULL;
}
/**
* gst_rtsp_media_new:
*
@ -449,6 +479,23 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
return TRUE;
}
static void
unlock_streams (GstRTSPMedia *media)
{
guint i, n_streams;
/* unlock the udp src elements */
n_streams = gst_rtsp_media_n_streams (media);
for (i = 0; i < n_streams; i++) {
GstRTSPMediaStream *stream;
stream = gst_rtsp_media_get_stream (media, i);
gst_element_set_locked_state (stream->udpsrc[0], FALSE);
gst_element_set_locked_state (stream->udpsrc[1], FALSE);
}
}
static void
collect_media_stats (GstRTSPMedia *media)
{
@ -474,6 +521,92 @@ collect_media_stats (GstRTSPMedia *media)
}
}
static gboolean
default_handle_message (GstRTSPMedia *media, GstMessage *message)
{
GstMessageType type;
type = GST_MESSAGE_TYPE (message);
switch (type) {
case GST_MESSAGE_STATE_CHANGED:
break;
case GST_MESSAGE_BUFFERING:
{
gint percent;
gst_message_parse_buffering (message, &percent);
/* no state management needed for live pipelines */
if (media->is_live)
break;
if (percent == 100) {
/* a 100% message means buffering is done */
media->buffering = FALSE;
/* if the desired state is playing, go back */
if (media->target_state == GST_STATE_PLAYING) {
g_message ("Buffering done, setting pipeline to PLAYING");
gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
}
else {
g_message ("Buffering done");
}
} else {
/* buffering busy */
if (media->buffering == FALSE) {
if (media->target_state == GST_STATE_PLAYING) {
/* we were not buffering but PLAYING, PAUSE the pipeline. */
g_message ("Buffering, setting pipeline to PAUSED ...");
gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
}
else {
g_message ("Buffering ...");
}
}
media->buffering = TRUE;
}
break;
}
case GST_MESSAGE_LATENCY:
{
gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
break;
}
case GST_MESSAGE_ERROR:
{
GError *gerror;
gchar *debug;
gst_message_parse_error (message, &gerror, &debug);
g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
g_error_free (gerror);
g_free (debug);
break;
}
default:
g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
break;
}
return TRUE;
}
static gboolean
bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
{
GstRTSPMediaClass *klass;
gboolean ret;
klass = GST_RTSP_MEDIA_GET_CLASS (media);
if (klass->handle_message)
ret = klass->handle_message (media, message);
else
ret = FALSE;
return ret;
}
/**
* gst_rtsp_media_prepare:
* @obj: a #GstRTSPMedia
@ -488,6 +621,8 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
{
GstStateChangeReturn ret;
guint i, n_streams;
GstRTSPMediaClass *klass;
GstBus *bus;
if (media->prepared)
goto was_prepared;
@ -495,6 +630,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
g_message ("preparing media %p", media);
media->pipeline = gst_pipeline_new ("media-pipeline");
bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
@ -515,6 +651,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
/* first go to PAUSED */
ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
media->target_state = GST_STATE_PAUSED;
switch (ret) {
case GST_STATE_CHANGE_SUCCESS:
@ -524,6 +661,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
case GST_STATE_CHANGE_NO_PREROLL:
/* we need to go to PLAYING */
g_message ("live media %p", media);
media->is_live = TRUE;
ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
break;
case GST_STATE_CHANGE_FAILURE:
@ -539,18 +677,21 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
/* collect stats about the media */
collect_media_stats (media);
/* unlock the udp src elements */
n_streams = gst_rtsp_media_n_streams (media);
for (i = 0; i < n_streams; i++) {
GstRTSPMediaStream *stream;
stream = gst_rtsp_media_get_stream (media, i);
gst_element_set_locked_state (stream->udpsrc[0], FALSE);
gst_element_set_locked_state (stream->udpsrc[1], FALSE);
}
/* unlock the streams so that they follow the state changes from now on */
unlock_streams (media);
g_message ("object %p is prerolled", media);
/* add the pipeline bus to our custom mainloop */
bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
media->source = gst_bus_create_watch (bus);
gst_object_unref (bus);
g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
klass = GST_RTSP_MEDIA_GET_CLASS (media);
media->id = g_source_attach (media->source, klass->context);
media->prepared = TRUE;
return TRUE;
@ -563,7 +704,33 @@ was_prepared:
/* ERRORS */
state_failed:
{
GstMessage *message;
g_message ("state change failed for media %p", media);
while ((message = gst_bus_pop (bus))) {
GstMessageType type;
type = GST_MESSAGE_TYPE (message);
switch (type) {
case GST_MESSAGE_ERROR:
{
GError *gerror;
gchar *debug;
gst_message_parse_error (message, &gerror, &debug);
g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
g_error_free (gerror);
g_free (debug);
break;
}
default:
break;
}
gst_message_unref (message);
}
unlock_streams (media);
gst_element_set_state (media->pipeline, GST_STATE_NULL);
gst_object_unref (bus);
return FALSE;
}
}
@ -611,6 +778,7 @@ gst_rtsp_media_play (GstRTSPMedia *media, GArray *transports)
}
g_message ("playing");
media->target_state = GST_STATE_PLAYING;
ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
return TRUE;
@ -659,6 +827,7 @@ gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports)
}
g_message ("pause");
media->target_state = GST_STATE_PAUSED;
ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
return TRUE;
@ -685,6 +854,7 @@ gst_rtsp_media_stop (GstRTSPMedia *media, GArray *transports)
gst_rtsp_media_pause (media, transports);
g_message ("stop");
media->target_state = GST_STATE_NULL;
ret = gst_element_set_state (media->pipeline, GST_STATE_NULL);
return TRUE;

View file

@ -124,6 +124,12 @@ struct _GstRTSPMedia {
/* the pipeline for the media */
GstElement *pipeline;
GSource *source;
guint id;
gboolean is_live;
gboolean buffering;
GstState target_state;
/* RTP session manager */
GstElement *rtpbin;
@ -135,8 +141,24 @@ struct _GstRTSPMedia {
GstRTSPTimeRange range;
};
/**
* GstRTSPMediaClass:
* @context: the main context for dispatching messages
* @loop: the mainloop for message.
* @thread: the thread dispatching messages.
* @handle_message: handle a message
*
* The RTSP media class
*/
struct _GstRTSPMediaClass {
GObjectClass parent_class;
/* thread for the mainloop */
GMainContext *context;
GMainLoop *loop;
GThread *thread;
gboolean (*handle_message) (GstRTSPMedia *media, GstMessage *message);
};
GType gst_rtsp_media_get_type (void);