server: use appsink and appsrc with the API

Use the appsink/appsrc API instead of the signals for higher
performance.
This commit is contained in:
Wim Taymans 2009-04-14 23:38:58 +02:00 committed by Wim Taymans
parent 5a074c81dd
commit 3f1f38f479
3 changed files with 40 additions and 15 deletions

View file

@ -29,7 +29,9 @@ libgstrtspserver_@GST_MAJORMINOR@_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstrtspserver_@GST_MAJORMINOR@_la_LIBADD = \ libgstrtspserver_@GST_MAJORMINOR@_la_LIBADD = \
$(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) \ $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) \
-lgstrtp-@GST_MAJORMINOR@ -lgstrtsp-@GST_MAJORMINOR@ \ -lgstrtp-@GST_MAJORMINOR@ -lgstrtsp-@GST_MAJORMINOR@ \
-lgstsdp-@GST_MAJORMINOR@ $(GST_LIBS) $(LIBM) -lgstsdp-@GST_MAJORMINOR@ \
-lgstapp-@GST_MAJORMINOR@ \
$(GST_LIBS) $(LIBM)
libgstrtspserver_@GST_MAJORMINOR@_la_LIBTOOLFLAGS = --tag=disable-static libgstrtspserver_@GST_MAJORMINOR@_la_LIBTOOLFLAGS = --tag=disable-static
libgstrtspserver_@GST_MAJORMINOR@includedir = $(includedir)/gstreamer-@GST_MAJORMINOR@/gst/rtsp-server libgstrtspserver_@GST_MAJORMINOR@includedir = $(includedir)/gstreamer-@GST_MAJORMINOR@/gst/rtsp-server

View file

@ -1017,6 +1017,7 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
guint8 *data; guint8 *data;
guint size; guint size;
GstBuffer *buffer; GstBuffer *buffer;
gboolean handled;
/* find the stream for this message */ /* find the stream for this message */
res = gst_rtsp_message_parse_data (message, &channel); res = gst_rtsp_message_parse_data (message, &channel);
@ -1030,6 +1031,7 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
GST_BUFFER_MALLOCDATA (buffer) = data; GST_BUFFER_MALLOCDATA (buffer) = data;
GST_BUFFER_SIZE (buffer) = size; GST_BUFFER_SIZE (buffer) = size;
handled = FALSE;
for (walk = client->streams; walk; walk = g_list_next (walk)) { for (walk = client->streams; walk; walk = g_list_next (walk)) {
GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data; GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
GstRTSPMediaStream *mstream; GstRTSPMediaStream *mstream;
@ -1048,12 +1050,17 @@ handle_data (GstRTSPClient *client, GstRTSPMessage *message)
/* dispatch to the stream based on the channel number */ /* dispatch to the stream based on the channel number */
if (tr->interleaved.min == channel) { if (tr->interleaved.min == channel) {
gst_rtsp_media_stream_rtp (mstream, buffer); gst_rtsp_media_stream_rtp (mstream, buffer);
handled = TRUE;
break;
} else if (tr->interleaved.max == channel) { } else if (tr->interleaved.max == channel) {
gst_rtsp_media_stream_rtcp (mstream, buffer); gst_rtsp_media_stream_rtcp (mstream, buffer);
handled = TRUE;
break;
} }
} }
} }
gst_buffer_unref (buffer); if (!handled)
gst_buffer_unref (buffer);
} }
/** /**

View file

@ -17,6 +17,9 @@
* Boston, MA 02111-1307, USA. * Boston, MA 02111-1307, USA.
*/ */
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>
#include "rtsp-media.h" #include "rtsp-media.h"
#define DEFAULT_SHARED FALSE #define DEFAULT_SHARED FALSE
@ -468,6 +471,8 @@ weird_type:
* Handle an RTP buffer for the stream. This method is usually called when a * Handle an RTP buffer for the stream. This method is usually called when a
* message has been received from a client using the TCP transport. * message has been received from a client using the TCP transport.
* *
* This function takes ownership of @buffer.
*
* Returns: a GstFlowReturn. * Returns: a GstFlowReturn.
*/ */
GstFlowReturn GstFlowReturn
@ -475,7 +480,7 @@ gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
{ {
GstFlowReturn ret; GstFlowReturn ret;
g_signal_emit_by_name (stream->appsrc[0], "push-buffer", buffer, &ret); ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
return ret; return ret;
} }
@ -488,6 +493,8 @@ gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
* Handle an RTCP buffer for the stream. This method is usually called when a * Handle an RTCP buffer for the stream. This method is usually called when a
* message has been received from a client using the TCP transport. * message has been received from a client using the TCP transport.
* *
* This function takes ownership of @buffer.
*
* Returns: a GstFlowReturn. * Returns: a GstFlowReturn.
*/ */
GstFlowReturn GstFlowReturn
@ -495,9 +502,9 @@ gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
{ {
GstFlowReturn ret; GstFlowReturn ret;
g_signal_emit_by_name (stream->appsrc[1], "push-buffer", buffer, &ret); ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
return GST_FLOW_ERROR; return ret;
} }
/* Allocate the udp ports and sockets */ /* Allocate the udp ports and sockets */
@ -710,20 +717,23 @@ on_timeout (GObject *session, GObject *source, GstRTSPMedia *media)
g_message ("%p: source %p timeout", media, source); g_message ("%p: source %p timeout", media, source);
} }
static void static GstFlowReturn
handle_new_buffer (GstElement *sink, GstRTSPMediaStream *stream) handle_new_buffer (GstAppSink *sink, gpointer user_data)
{ {
GList *walk; GList *walk;
GstBuffer *buffer; GstBuffer *buffer;
GstRTSPMediaStream *stream;
g_signal_emit_by_name (sink, "pull-buffer", &buffer); buffer = gst_app_sink_pull_buffer (sink);
if (!buffer) if (!buffer)
return; return GST_FLOW_OK;
stream = (GstRTSPMediaStream *) user_data;
for (walk = stream->transports; walk; walk = g_list_next (walk)) { for (walk = stream->transports; walk; walk = g_list_next (walk)) {
GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data; GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
if (sink == stream->appsink[0]) { if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
if (tr->send_rtp) if (tr->send_rtp)
tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data); tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
} }
@ -733,8 +743,16 @@ handle_new_buffer (GstElement *sink, GstRTSPMediaStream *stream)
} }
} }
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
return GST_FLOW_OK;
} }
static GstAppSinkCallbacks sink_cb = {
NULL, /* not interested in EOS */
NULL, /* not interested in preroll buffers */
handle_new_buffer
};
/* prepare the pipeline objects to handle @stream in @media */ /* prepare the pipeline objects to handle @stream in @media */
static gboolean static gboolean
setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
@ -762,15 +780,13 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL); stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
stream->appsink[i] = gst_element_factory_make ("appsink", NULL); stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL); g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
g_object_set (stream->appsink[i], "emit-signals", TRUE, NULL); g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL); g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]); gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]); gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
&sink_cb, stream, NULL);
} }
g_signal_connect (stream->appsink[0], "new-buffer",
(GCallback) handle_new_buffer, stream);
g_signal_connect (stream->appsink[1], "new-buffer",
(GCallback) handle_new_buffer, stream);
/* hook up the stream to the RTP session elements. */ /* hook up the stream to the RTP session elements. */
name = g_strdup_printf ("send_rtp_sink_%d", idx); name = g_strdup_printf ("send_rtp_sink_%d", idx);