rtspsrc: implement custom event handler

Extend the _push_event() function so that it can also send events to the udp
sources when asked.
Implement a custum send_event function that correctly dispatches the downstream
events in TCP mode. This fixes sending EOS to rtspsrc and have it push the EOS
downstream.
This commit is contained in:
Wim Taymans 2010-08-19 17:06:26 +02:00
parent 95270dc2fb
commit e4f8144bbf

View file

@ -225,6 +225,7 @@ static GstCaps *gst_rtspsrc_media_to_caps (gint pt, const GstSDPMedia * media);
static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element, static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element,
GstStateChange transition); GstStateChange transition);
static gboolean gst_rtspsrc_send_event (GstElement * element, GstEvent * event);
static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message); static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message);
static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd,
@ -242,9 +243,10 @@ static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler,
static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src); static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src);
static void gst_rtspsrc_loop (GstRTSPSrc * src); static void gst_rtspsrc_loop (GstRTSPSrc * src);
static void gst_rtspsrc_stream_push_event (GstRTSPSrc * src, static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src,
GstRTSPStream * stream, GstEvent * event); GstRTSPStream * stream, GstEvent * event, gboolean source);
static void gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event); static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event,
gboolean source);
static gchar *gst_rtspsrc_dup_printf (const gchar * format, ...); static gchar *gst_rtspsrc_dup_printf (const gchar * format, ...);
/* commands we send to out loop to notify it of events */ /* commands we send to out loop to notify it of events */
@ -429,6 +431,7 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
"eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE, "eg. 3000-3005 (NULL = no restrictions)", DEFAULT_PORT_RANGE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->send_event = gst_rtspsrc_send_event;
gstelement_class->change_state = gst_rtspsrc_change_state; gstelement_class->change_state = gst_rtspsrc_change_state;
gstbin_class->handle_message = gst_rtspsrc_handle_message; gstbin_class->handle_message = gst_rtspsrc_handle_message;
@ -1604,7 +1607,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush)
gst_object_unref (clock); gst_object_unref (clock);
} }
} }
gst_rtspsrc_push_event (src, event); gst_rtspsrc_push_event (src, event, FALSE);
gst_rtspsrc_loop_send_cmd (src, cmd, flush); gst_rtspsrc_loop_send_cmd (src, cmd, flush);
/* make running time start start at 0 again */ /* make running time start start at 0 again */
@ -2197,7 +2200,7 @@ gst_rtspsrc_do_stream_eos (GstRTSPSrc * src, guint session)
goto was_eos; goto was_eos;
stream->eos = TRUE; stream->eos = TRUE;
gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos ()); gst_rtspsrc_stream_push_event (src, stream, gst_event_new_eos (), TRUE);
return; return;
/* ERRORS */ /* ERRORS */
@ -3085,46 +3088,59 @@ done:
return ret; return ret;
} }
static void static gboolean
gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream,
GstEvent * event) GstEvent * event, gboolean source)
{ {
gboolean res = TRUE;
/* only streams that have a connection to the outside world */ /* only streams that have a connection to the outside world */
if (stream->srcpad == NULL) if (stream->srcpad == NULL)
goto done; goto done;
if (stream->channelpad[0]) { if (source && stream->udpsrc[0]) {
gst_event_ref (event);
res = gst_element_send_event (stream->udpsrc[0], event);
} else if (stream->channelpad[0]) {
gst_event_ref (event); gst_event_ref (event);
if (GST_PAD_IS_SRC (stream->channelpad[0])) if (GST_PAD_IS_SRC (stream->channelpad[0]))
gst_pad_push_event (stream->channelpad[0], event); res = gst_pad_push_event (stream->channelpad[0], event);
else else
gst_pad_send_event (stream->channelpad[0], event); res = gst_pad_send_event (stream->channelpad[0], event);
} }
if (stream->channelpad[1]) { if (source && stream->udpsrc[1]) {
gst_event_ref (event);
res &= gst_element_send_event (stream->udpsrc[1], event);
} else if (stream->channelpad[1]) {
gst_event_ref (event); gst_event_ref (event);
if (GST_PAD_IS_SRC (stream->channelpad[1])) if (GST_PAD_IS_SRC (stream->channelpad[1]))
gst_pad_push_event (stream->channelpad[1], event); res &= gst_pad_push_event (stream->channelpad[1], event);
else else
gst_pad_send_event (stream->channelpad[1], event); res &= gst_pad_send_event (stream->channelpad[1], event);
} }
done: done:
gst_event_unref (event); gst_event_unref (event);
return res;
} }
static void static gboolean
gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event) gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source)
{ {
GList *streams; GList *streams;
gboolean res = TRUE;
for (streams = src->streams; streams; streams = g_list_next (streams)) { for (streams = src->streams; streams; streams = g_list_next (streams)) {
GstRTSPStream *ostream = (GstRTSPStream *) streams->data; GstRTSPStream *ostream = (GstRTSPStream *) streams->data;
gst_event_ref (event); gst_event_ref (event);
gst_rtspsrc_stream_push_event (src, ostream, event); res &= gst_rtspsrc_stream_push_event (src, ostream, event, source);
} }
gst_event_unref (event); gst_event_unref (event);
return res;
} }
static GstRTSPResult static GstRTSPResult
@ -3857,7 +3873,7 @@ pause:
gst_message_new_segment_done (GST_OBJECT_CAST (src), gst_message_new_segment_done (GST_OBJECT_CAST (src),
src->segment.format, src->segment.last_stop)); src->segment.format, src->segment.last_stop));
} else { } else {
gst_rtspsrc_push_event (src, gst_event_new_eos ()); gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE);
} }
} else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) { } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
/* for fatal errors we post an error message, post the error before the /* for fatal errors we post an error message, post the error before the
@ -3865,7 +3881,7 @@ pause:
GST_ELEMENT_ERROR (src, STREAM, FAILED, GST_ELEMENT_ERROR (src, STREAM, FAILED,
("Internal data flow error."), ("Internal data flow error."),
("streaming task paused, reason %s (%d)", reason, ret)); ("streaming task paused, reason %s (%d)", reason, ret));
gst_rtspsrc_push_event (src, gst_event_new_eos ()); gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE);
} }
return; return;
} }
@ -6089,6 +6105,24 @@ open_failed:
} }
} }
static gboolean
gst_rtspsrc_send_event (GstElement * element, GstEvent * event)
{
gboolean res;
GstRTSPSrc *rtspsrc;
rtspsrc = GST_RTSPSRC (element);
if (GST_EVENT_IS_DOWNSTREAM (event)) {
res = gst_rtspsrc_push_event (rtspsrc, event, TRUE);
} else {
res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
}
return res;
}
/*** GSTURIHANDLER INTERFACE *************************************************/ /*** GSTURIHANDLER INTERFACE *************************************************/
static GstURIType static GstURIType