From e4f8144bbfac5f52b6bb0b069834197eac50661d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 19 Aug 2010 17:06:26 +0200 Subject: [PATCH] rtspsrc: implement custom event handler Extend the _push_event() function so that it can also send events to the udp sources when asked. Implement a custum send_event function that correctly dispatches the downstream events in TCP mode. This fixes sending EOS to rtspsrc and have it push the EOS downstream. --- gst/rtsp/gstrtspsrc.c | 70 ++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 11ac0a8f5d..d0fdcb615d 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -225,6 +225,7 @@ static GstCaps *gst_rtspsrc_media_to_caps (gint pt, const GstSDPMedia * media); static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element, GstStateChange transition); +static gboolean gst_rtspsrc_send_event (GstElement * element, GstEvent * event); static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message); static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, @@ -242,9 +243,10 @@ static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler, static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src); static void gst_rtspsrc_loop (GstRTSPSrc * src); -static void gst_rtspsrc_stream_push_event (GstRTSPSrc * src, - GstRTSPStream * stream, GstEvent * event); -static void gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event); +static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, + GstRTSPStream * stream, GstEvent * event, gboolean source); +static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, + gboolean source); static gchar *gst_rtspsrc_dup_printf (const gchar * format, ...); /* commands we send to out loop to notify it of events */ @@ -429,6 +431,7 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass) "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->send_event = gst_rtspsrc_send_event; gstelement_class->change_state = gst_rtspsrc_change_state; gstbin_class->handle_message = gst_rtspsrc_handle_message; @@ -1604,7 +1607,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush) gst_object_unref (clock); } } - gst_rtspsrc_push_event (src, event); + gst_rtspsrc_push_event (src, event, FALSE); gst_rtspsrc_loop_send_cmd (src, cmd, flush); /* make running time start start at 0 again */ @@ -2197,7 +2200,7 @@ gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session) goto was_eos; stream->eos = TRUE; - gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos ()); + gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos (), TRUE); return; /* ERRORS */ @@ -3085,46 +3088,59 @@ done: return ret; } -static void +static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, - GstEvent * event) + GstEvent * event, gboolean source) { + gboolean res = TRUE; + /* only streams that have a connection to the outside world */ if (stream->srcpad == NULL) goto done; - if (stream->channelpad[0]) { + if (source && stream->udpsrc[0]) { + gst_event_ref (event); + res = gst_element_send_event (stream->udpsrc[0], event); + } else if (stream->channelpad[0]) { gst_event_ref (event); if (GST_PAD_IS_SRC (stream->channelpad[0])) - gst_pad_push_event (stream->channelpad[0], event); + res = gst_pad_push_event (stream->channelpad[0], event); else - gst_pad_send_event (stream->channelpad[0], event); + res = gst_pad_send_event (stream->channelpad[0], event); } - if (stream->channelpad[1]) { + if (source && stream->udpsrc[1]) { + gst_event_ref (event); + res &= gst_element_send_event (stream->udpsrc[1], event); + } else if (stream->channelpad[1]) { gst_event_ref (event); if (GST_PAD_IS_SRC (stream->channelpad[1])) - gst_pad_push_event (stream->channelpad[1], event); + res &= gst_pad_push_event (stream->channelpad[1], event); else - gst_pad_send_event (stream->channelpad[1], event); + res &= gst_pad_send_event (stream->channelpad[1], event); } done: gst_event_unref (event); + + return res; } -static void -gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event) +static gboolean +gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source) { GList *streams; + gboolean res = TRUE; for (streams = src->streams; streams; streams = g_list_next (streams)) { GstRTSPStream *ostream = (GstRTSPStream *) streams->data; gst_event_ref (event); - gst_rtspsrc_stream_push_event (src, ostream, event); + res &= gst_rtspsrc_stream_push_event (src, ostream, event, source); } gst_event_unref (event); + + return res; } static GstRTSPResult @@ -3857,7 +3873,7 @@ pause: gst_message_new_segment_done (GST_OBJECT_CAST (src), src->segment.format, src->segment.last_stop)); } else { - gst_rtspsrc_push_event (src, gst_event_new_eos ()); + gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE); } } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) { /* for fatal errors we post an error message, post the error before the @@ -3865,7 +3881,7 @@ pause: GST_ELEMENT_ERROR (src, STREAM, FAILED, ("Internal data flow error."), ("streaming task paused, reason %s (%d)", reason, ret)); - gst_rtspsrc_push_event (src, gst_event_new_eos ()); + gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE); } return; } @@ -6089,6 +6105,24 @@ open_failed: } } +static gboolean +gst_rtspsrc_send_event (GstElement * element, GstEvent * event) +{ + gboolean res; + GstRTSPSrc *rtspsrc; + + rtspsrc = GST_RTSPSRC (element); + + if (GST_EVENT_IS_DOWNSTREAM (event)) { + res = gst_rtspsrc_push_event (rtspsrc, event, TRUE); + } else { + res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event); + } + + return res; +} + + /*** GSTURIHANDLER INTERFACE *************************************************/ static GstURIType