mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-04 14:38:48 +00:00
rtspclientsink: Use a mutex for protecting against concurrent send/receives
This is a simple port of: *a722f6e832
*c438545dc9
*cd17c71dce
in gst-plugins-good.
This commit is contained in:
parent
d690fbd37d
commit
f1088f368f
2 changed files with 55 additions and 34 deletions
|
@ -666,6 +666,9 @@ gst_rtsp_client_sink_init (GstRTSPClientSink * sink)
|
||||||
|
|
||||||
sink->state = GST_RTSP_STATE_INVALID;
|
sink->state = GST_RTSP_STATE_INVALID;
|
||||||
|
|
||||||
|
g_mutex_init (&sink->conninfo.send_lock);
|
||||||
|
g_mutex_init (&sink->conninfo.recv_lock);
|
||||||
|
|
||||||
sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
|
sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
|
||||||
gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
|
gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
|
||||||
gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
|
gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
|
||||||
|
@ -714,6 +717,9 @@ gst_rtsp_client_sink_finalize (GObject * object)
|
||||||
g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
|
g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
|
||||||
g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
|
g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
|
||||||
|
|
||||||
|
g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
|
||||||
|
g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
|
||||||
|
|
||||||
g_mutex_clear (&rtsp_client_sink->send_lock);
|
g_mutex_clear (&rtsp_client_sink->send_lock);
|
||||||
|
|
||||||
g_mutex_clear (&rtsp_client_sink->preroll_lock);
|
g_mutex_clear (&rtsp_client_sink->preroll_lock);
|
||||||
|
@ -1137,6 +1143,9 @@ gst_rtsp_client_sink_request_new_pad (GstElement * element,
|
||||||
|
|
||||||
(void) gst_rtsp_client_sink_get_factories ();
|
(void) gst_rtsp_client_sink_get_factories ();
|
||||||
|
|
||||||
|
g_mutex_init (&context->conninfo.send_lock);
|
||||||
|
g_mutex_init (&context->conninfo.recv_lock);
|
||||||
|
|
||||||
GST_RTSP_STATE_LOCK (sink);
|
GST_RTSP_STATE_LOCK (sink);
|
||||||
sink->contexts = g_list_prepend (sink->contexts, context);
|
sink->contexts = g_list_prepend (sink->contexts, context);
|
||||||
GST_RTSP_STATE_UNLOCK (sink);
|
GST_RTSP_STATE_UNLOCK (sink);
|
||||||
|
@ -1182,6 +1191,9 @@ gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad)
|
||||||
g_free (context->conninfo.location);
|
g_free (context->conninfo.location);
|
||||||
context->conninfo.location = NULL;
|
context->conninfo.location = NULL;
|
||||||
|
|
||||||
|
g_mutex_clear (&context->conninfo.send_lock);
|
||||||
|
g_mutex_clear (&context->conninfo.recv_lock);
|
||||||
|
|
||||||
g_free (context);
|
g_free (context);
|
||||||
|
|
||||||
gst_element_remove_pad (element, pad);
|
gst_element_remove_pad (element, pad);
|
||||||
|
@ -1603,28 +1615,34 @@ gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink)
|
||||||
|
|
||||||
static GstRTSPResult
|
static GstRTSPResult
|
||||||
gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
|
gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
|
||||||
GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout)
|
GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
|
||||||
{
|
{
|
||||||
GstRTSPResult ret;
|
GstRTSPResult ret;
|
||||||
|
|
||||||
if (conn)
|
if (conninfo->connection) {
|
||||||
ret = gst_rtsp_connection_send (conn, message, timeout);
|
g_mutex_lock (&conninfo->send_lock);
|
||||||
else
|
ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
|
||||||
|
g_mutex_unlock (&conninfo->send_lock);
|
||||||
|
} else {
|
||||||
ret = GST_RTSP_ERROR;
|
ret = GST_RTSP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static GstRTSPResult
|
static GstRTSPResult
|
||||||
gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
|
gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
|
||||||
GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout)
|
GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
|
||||||
{
|
{
|
||||||
GstRTSPResult ret;
|
GstRTSPResult ret;
|
||||||
|
|
||||||
if (conn)
|
if (conninfo->connection) {
|
||||||
ret = gst_rtsp_connection_receive (conn, message, timeout);
|
g_mutex_lock (&conninfo->recv_lock);
|
||||||
else
|
ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
|
||||||
|
g_mutex_unlock (&conninfo->recv_lock);
|
||||||
|
} else {
|
||||||
ret = GST_RTSP_ERROR;
|
ret = GST_RTSP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -1793,7 +1811,7 @@ gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink,
|
||||||
/* FIXME, handle server request, reply with OK, for now */
|
/* FIXME, handle server request, reply with OK, for now */
|
||||||
static GstRTSPResult
|
static GstRTSPResult
|
||||||
gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
|
gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
|
||||||
GstRTSPConnection * conn, GstRTSPMessage * request)
|
GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
|
||||||
{
|
{
|
||||||
GstRTSPMessage response = { 0 };
|
GstRTSPMessage response = { 0 };
|
||||||
GstRTSPResult res;
|
GstRTSPResult res;
|
||||||
|
@ -1818,7 +1836,7 @@ gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
|
||||||
if (sink->debug)
|
if (sink->debug)
|
||||||
gst_rtsp_message_dump (&response);
|
gst_rtsp_message_dump (&response);
|
||||||
|
|
||||||
res = gst_rtsp_client_sink_connection_send (sink, conn, &response, NULL);
|
res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL);
|
||||||
if (res < 0)
|
if (res < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
@ -1869,7 +1887,7 @@ gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink)
|
||||||
gst_rtsp_message_dump (&request);
|
gst_rtsp_message_dump (&request);
|
||||||
|
|
||||||
res =
|
res =
|
||||||
gst_rtsp_client_sink_connection_send (sink, sink->conninfo.connection,
|
gst_rtsp_client_sink_connection_send (sink, &sink->conninfo,
|
||||||
&request, NULL);
|
&request, NULL);
|
||||||
if (res < 0)
|
if (res < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
@ -1920,7 +1938,7 @@ gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
|
||||||
* keep-alive request to keep the session open. */
|
* keep-alive request to keep the session open. */
|
||||||
res =
|
res =
|
||||||
gst_rtsp_client_sink_connection_receive (sink,
|
gst_rtsp_client_sink_connection_receive (sink,
|
||||||
sink->conninfo.connection, &message, &tv_timeout);
|
&sink->conninfo, &message, &tv_timeout);
|
||||||
|
|
||||||
switch (res) {
|
switch (res) {
|
||||||
case GST_RTSP_OK:
|
case GST_RTSP_OK:
|
||||||
|
@ -1964,7 +1982,7 @@ gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
|
||||||
/* server sends us a request message, handle it */
|
/* server sends us a request message, handle it */
|
||||||
res =
|
res =
|
||||||
gst_rtsp_client_sink_handle_request (sink,
|
gst_rtsp_client_sink_handle_request (sink,
|
||||||
sink->conninfo.connection, &message);
|
&sink->conninfo, &message);
|
||||||
if (res == GST_RTSP_EEOF)
|
if (res == GST_RTSP_EEOF)
|
||||||
goto server_eof;
|
goto server_eof;
|
||||||
else if (res < 0)
|
else if (res < 0)
|
||||||
|
@ -2479,7 +2497,7 @@ no_user_pass:
|
||||||
|
|
||||||
static GstRTSPResult
|
static GstRTSPResult
|
||||||
gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
|
gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
|
||||||
GstRTSPConnection * conn, GstRTSPMessage * request,
|
GstRTSPConnInfo * conninfo, GstRTSPMessage * request,
|
||||||
GstRTSPMessage * response, GstRTSPStatusCode * code)
|
GstRTSPMessage * response, GstRTSPStatusCode * code)
|
||||||
{
|
{
|
||||||
GstRTSPResult res;
|
GstRTSPResult res;
|
||||||
|
@ -2496,14 +2514,14 @@ again:
|
||||||
g_mutex_lock (&sink->send_lock);
|
g_mutex_lock (&sink->send_lock);
|
||||||
|
|
||||||
res =
|
res =
|
||||||
gst_rtsp_client_sink_connection_send (sink, conn, request,
|
gst_rtsp_client_sink_connection_send (sink, conninfo, request,
|
||||||
sink->ptcp_timeout);
|
sink->ptcp_timeout);
|
||||||
if (res < 0) {
|
if (res < 0) {
|
||||||
g_mutex_unlock (&sink->send_lock);
|
g_mutex_unlock (&sink->send_lock);
|
||||||
goto send_error;
|
goto send_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
gst_rtsp_connection_reset_timeout (conn);
|
gst_rtsp_connection_reset_timeout (conninfo->connection);
|
||||||
|
|
||||||
/* See if we should handle the response */
|
/* See if we should handle the response */
|
||||||
if (response == NULL) {
|
if (response == NULL) {
|
||||||
|
@ -2512,7 +2530,7 @@ again:
|
||||||
}
|
}
|
||||||
next:
|
next:
|
||||||
res =
|
res =
|
||||||
gst_rtsp_client_sink_connection_receive (sink, conn, response,
|
gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
|
||||||
sink->ptcp_timeout);
|
sink->ptcp_timeout);
|
||||||
|
|
||||||
g_mutex_unlock (&sink->send_lock);
|
g_mutex_unlock (&sink->send_lock);
|
||||||
|
@ -2526,7 +2544,7 @@ next:
|
||||||
|
|
||||||
switch (response->type) {
|
switch (response->type) {
|
||||||
case GST_RTSP_MESSAGE_REQUEST:
|
case GST_RTSP_MESSAGE_REQUEST:
|
||||||
res = gst_rtsp_client_sink_handle_request (sink, conn, response);
|
res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
|
||||||
if (res == GST_RTSP_EEOF)
|
if (res == GST_RTSP_EEOF)
|
||||||
goto server_eof;
|
goto server_eof;
|
||||||
else if (res < 0)
|
else if (res < 0)
|
||||||
|
@ -2663,7 +2681,7 @@ gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state)
|
||||||
* Returns: #GST_RTSP_OK if the processing was successful.
|
* Returns: #GST_RTSP_OK if the processing was successful.
|
||||||
*/
|
*/
|
||||||
static GstRTSPResult
|
static GstRTSPResult
|
||||||
gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn,
|
gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
|
||||||
GstRTSPMessage * request, GstRTSPMessage * response,
|
GstRTSPMessage * request, GstRTSPMessage * response,
|
||||||
GstRTSPStatusCode * code)
|
GstRTSPStatusCode * code)
|
||||||
{
|
{
|
||||||
|
@ -2685,7 +2703,7 @@ gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn,
|
||||||
method = request->type_data.request.method;
|
method = request->type_data.request.method;
|
||||||
|
|
||||||
if ((res =
|
if ((res =
|
||||||
gst_rtsp_client_sink_try_send (sink, conn, request, response,
|
gst_rtsp_client_sink_try_send (sink, conninfo, request, response,
|
||||||
&int_code)) < 0)
|
&int_code)) < 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
|
@ -2892,7 +2910,7 @@ gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink,
|
||||||
("Retrieving server options"));
|
("Retrieving server options"));
|
||||||
|
|
||||||
if ((res =
|
if ((res =
|
||||||
gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
|
gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
|
||||||
&response, NULL)) < 0)
|
&response, NULL)) < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
@ -3074,7 +3092,7 @@ gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async,
|
||||||
GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
|
GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
|
||||||
|
|
||||||
if ((res =
|
if ((res =
|
||||||
gst_rtsp_client_sink_send (sink, info->connection, &request,
|
gst_rtsp_client_sink_send (sink, info, &request,
|
||||||
&response, NULL)) < 0)
|
&response, NULL)) < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
@ -3486,7 +3504,7 @@ do_send_data (GstBuffer * buffer, guint8 channel,
|
||||||
gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
|
gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
|
||||||
|
|
||||||
res =
|
res =
|
||||||
gst_rtsp_client_sink_try_send (sink, sink->conninfo.connection, &message,
|
gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message,
|
||||||
NULL, NULL);
|
NULL, NULL);
|
||||||
|
|
||||||
gst_rtsp_message_steal_body (&message, &data, &usize);
|
gst_rtsp_message_steal_body (&message, &data, &usize);
|
||||||
|
@ -3534,7 +3552,7 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
|
||||||
GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
|
GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
|
||||||
GstRTSPStream *stream;
|
GstRTSPStream *stream;
|
||||||
|
|
||||||
GstRTSPConnection *conn;
|
GstRTSPConnInfo *info;
|
||||||
GstRTSPProfile profiles;
|
GstRTSPProfile profiles;
|
||||||
GstRTSPProfile cur_profile;
|
GstRTSPProfile cur_profile;
|
||||||
gchar *transports;
|
gchar *transports;
|
||||||
|
@ -3571,14 +3589,14 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
|
||||||
stream);
|
stream);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
conn = context->conninfo.connection;
|
info = &context->conninfo;
|
||||||
} else {
|
} else {
|
||||||
conn = sink->conninfo.connection;
|
info = &sink->conninfo;
|
||||||
}
|
}
|
||||||
GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
|
GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
|
||||||
context->conninfo.location);
|
context->conninfo.location);
|
||||||
|
|
||||||
conn_socket = gst_rtsp_connection_get_read_socket (conn);
|
conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
|
||||||
sa = g_socket_get_local_address (conn_socket, NULL);
|
sa = g_socket_get_local_address (conn_socket, NULL);
|
||||||
family = g_socket_address_get_family (sa);
|
family = g_socket_address_get_family (sa);
|
||||||
g_object_unref (sa);
|
g_object_unref (sa);
|
||||||
|
@ -3649,7 +3667,7 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
|
||||||
context->index));
|
context->index));
|
||||||
|
|
||||||
/* handle the code ourselves */
|
/* handle the code ourselves */
|
||||||
res = gst_rtsp_client_sink_send (sink, conn, &request, &response, &code);
|
res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
|
||||||
if (res < 0)
|
if (res < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
@ -3981,7 +3999,7 @@ gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
|
||||||
("Sending server stream info"));
|
("Sending server stream info"));
|
||||||
|
|
||||||
if ((res =
|
if ((res =
|
||||||
gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
|
gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
|
||||||
&response, NULL)) < 0)
|
&response, NULL)) < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
@ -4016,7 +4034,7 @@ gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
|
||||||
if (async)
|
if (async)
|
||||||
GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
|
GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
|
||||||
if ((res =
|
if ((res =
|
||||||
gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
|
gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
|
||||||
&response, NULL)) < 0)
|
&response, NULL)) < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
@ -4108,7 +4126,7 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
|
||||||
* aggregate control */
|
* aggregate control */
|
||||||
for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
|
for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
|
||||||
GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
|
GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
|
||||||
GstRTSPConnection *conn;
|
GstRTSPConnInfo *info;
|
||||||
const gchar *setup_url;
|
const gchar *setup_url;
|
||||||
|
|
||||||
/* try aggregate control first but do non-aggregate control otherwise */
|
/* try aggregate control first but do non-aggregate control otherwise */
|
||||||
|
@ -4118,9 +4136,9 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (sink->conninfo.connection) {
|
if (sink->conninfo.connection) {
|
||||||
conn = sink->conninfo.connection;
|
info = &sink->conninfo;
|
||||||
} else if (stream->conninfo.connection) {
|
} else if (stream->conninfo.connection) {
|
||||||
conn = stream->conninfo.connection;
|
info = &stream->conninfo;
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -4135,7 +4153,7 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
|
||||||
goto create_request_failed;
|
goto create_request_failed;
|
||||||
|
|
||||||
if ((res =
|
if ((res =
|
||||||
gst_rtsp_client_sink_send (sink, conn, &request, &response,
|
gst_rtsp_client_sink_send (sink, info, &request, &response,
|
||||||
NULL)) < 0)
|
NULL)) < 0)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,9 @@ struct _GstRTSPConnInfo {
|
||||||
GstRTSPConnection *connection;
|
GstRTSPConnection *connection;
|
||||||
gboolean connected;
|
gboolean connected;
|
||||||
gboolean flushing;
|
gboolean flushing;
|
||||||
|
|
||||||
|
GMutex send_lock;
|
||||||
|
GMutex recv_lock;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct _GstRTSPStreamInfo GstRTSPStreamInfo;
|
typedef struct _GstRTSPStreamInfo GstRTSPStreamInfo;
|
||||||
|
|
Loading…
Reference in a new issue