sctp: Make receive/packetout callbacks thread-safe

This commit is contained in:
Sebastian Dröge 2020-01-30 16:06:09 +02:00
parent bff33f3b21
commit fa0a233fa7
4 changed files with 54 additions and 51 deletions

View file

@ -474,7 +474,7 @@ configure_association (GstSctpDec * self)
"local-port", G_BINDING_SYNC_CREATE); "local-port", G_BINDING_SYNC_CREATE);
gst_sctp_association_set_on_packet_received (self->sctp_association, gst_sctp_association_set_on_packet_received (self->sctp_association,
on_receive, self); on_receive, gst_object_ref (self), gst_object_unref);
return TRUE; return TRUE;
error: error:
@ -688,9 +688,8 @@ static void
sctpdec_cleanup (GstSctpDec * self) sctpdec_cleanup (GstSctpDec * self)
{ {
if (self->sctp_association) { if (self->sctp_association) {
/* FIXME: make this threadsafe */ gst_sctp_association_set_on_packet_received (self->sctp_association, NULL,
/* gst_sctp_association_set_on_packet_received (self->sctp_association, NULL, NULL, NULL);
NULL); */
g_signal_handler_disconnect (self->sctp_association, g_signal_handler_disconnect (self->sctp_association,
self->signal_handler_stream_reset); self->signal_handler_stream_reset);
stop_all_srcpad_tasks (self); stop_all_srcpad_tasks (self);

View file

@ -798,7 +798,7 @@ configure_association (GstSctpEnc * self)
"use-sock-stream", G_BINDING_SYNC_CREATE); "use-sock-stream", G_BINDING_SYNC_CREATE);
gst_sctp_association_set_on_packet_out (self->sctp_association, gst_sctp_association_set_on_packet_out (self->sctp_association,
on_sctp_packet_out, self); on_sctp_packet_out, gst_object_ref (self), gst_object_unref);
return TRUE; return TRUE;
error: error:
@ -905,8 +905,8 @@ sctpenc_cleanup (GstSctpEnc * self)
{ {
GstIterator *it; GstIterator *it;
/* FIXME: make this threadsafe */ gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL,
/* gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); */ NULL);
g_signal_handler_disconnect (self->sctp_association, g_signal_handler_disconnect (self->sctp_association,
self->signal_handler_state_changed); self->signal_handler_state_changed);

View file

@ -190,7 +190,7 @@ gst_sctp_association_init (GstSctpAssociation * self)
self->sctp_ass_sock = NULL; self->sctp_ass_sock = NULL;
self->connection_thread = NULL; self->connection_thread = NULL;
g_mutex_init (&self->association_mutex); g_rec_mutex_init (&self->association_mutex);
self->state = GST_SCTP_ASSOCIATION_STATE_NEW; self->state = GST_SCTP_ASSOCIATION_STATE_NEW;
@ -227,7 +227,7 @@ gst_sctp_association_set_property (GObject * object, guint prop_id,
{ {
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) { if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) {
switch (prop_id) { switch (prop_id) {
case PROP_LOCAL_PORT: case PROP_LOCAL_PORT:
@ -258,14 +258,14 @@ gst_sctp_association_set_property (GObject * object, guint prop_id,
break; break;
} }
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT) if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT)
maybe_set_state_to_ready (self); maybe_set_state_to_ready (self);
return; return;
error: error:
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
} }
static void static void
@ -273,7 +273,7 @@ maybe_set_state_to_ready (GstSctpAssociation * self)
{ {
gboolean signal_ready_state = FALSE; gboolean signal_ready_state = FALSE;
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) && if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) &&
(self->local_port != 0 && self->remote_port != 0) (self->local_port != 0 && self->remote_port != 0)
&& (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) { && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) {
@ -281,7 +281,7 @@ maybe_set_state_to_ready (GstSctpAssociation * self)
gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY, gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY,
FALSE); FALSE);
} }
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
/* The reason the state is changed twice is that we do not want to change state with /* The reason the state is changed twice is that we do not want to change state with
* notification while the association_mutex is locked. If someone listens * notification while the association_mutex is locked. If someone listens
@ -353,7 +353,7 @@ gst_sctp_association_start (GstSctpAssociation * self)
{ {
gchar *thread_name; gchar *thread_name;
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) { if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) {
g_warning ("SCTP association is in wrong state and cannot be started"); g_warning ("SCTP association is in wrong state and cannot be started");
goto configure_required; goto configure_required;
@ -364,7 +364,7 @@ gst_sctp_association_start (GstSctpAssociation * self)
gst_sctp_association_change_state (self, gst_sctp_association_change_state (self,
GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE); GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE);
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
/* The reason the state is changed twice is that we do not want to change state with /* The reason the state is changed twice is that we do not want to change state with
* notification while the association_mutex is locked. If someone listens * notification while the association_mutex is locked. If someone listens
@ -379,48 +379,46 @@ gst_sctp_association_start (GstSctpAssociation * self)
return TRUE; return TRUE;
error: error:
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR, gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
TRUE); TRUE);
configure_required: configure_required:
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
return FALSE; return FALSE;
} }
void void
gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data) GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data,
GDestroyNotify destroy_notify)
{ {
g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) { if (self->packet_out_destroy_notify)
self->packet_out_cb = packet_out_cb; self->packet_out_destroy_notify (self->packet_out_user_data);
self->packet_out_user_data = user_data; self->packet_out_cb = packet_out_cb;
} else { self->packet_out_user_data = user_data;
/* This is to be thread safe. The Association might try to write to the closure already */ self->packet_out_destroy_notify = destroy_notify;
g_warning ("It is not possible to change packet callback in this state"); g_rec_mutex_unlock (&self->association_mutex);
}
g_mutex_unlock (&self->association_mutex);
maybe_set_state_to_ready (self); maybe_set_state_to_ready (self);
} }
void void
gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data) GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data,
GDestroyNotify destroy_notify)
{ {
g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) { if (self->packet_received_destroy_notify)
self->packet_received_cb = packet_received_cb; self->packet_received_destroy_notify (self->packet_received_user_data);
self->packet_received_user_data = user_data; self->packet_received_cb = packet_received_cb;
} else { self->packet_received_user_data = user_data;
/* This is to be thread safe. The Association might try to write to the closure already */ self->packet_received_destroy_notify = destroy_notify;
g_warning ("It is not possible to change receive callback in this state"); g_rec_mutex_unlock (&self->association_mutex);
}
g_mutex_unlock (&self->association_mutex);
maybe_set_state_to_ready (self); maybe_set_state_to_ready (self);
} }
@ -442,7 +440,7 @@ gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf,
gboolean result = FALSE; gboolean result = FALSE;
struct sockaddr_conn remote_addr; struct sockaddr_conn remote_addr;
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED)
goto end; goto end;
@ -483,7 +481,7 @@ gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf,
result = TRUE; result = TRUE;
end: end:
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
return result; return result;
} }
@ -500,10 +498,10 @@ gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id)
srs->srs_number_streams = 1; srs->srs_number_streams = 1;
srs->srs_stream_list[0] = stream_id; srs->srs_stream_list[0] = stream_id;
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS, usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS,
srs, length); srs, length);
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
g_free (srs); g_free (srs);
} }
@ -511,13 +509,13 @@ gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id)
void void
gst_sctp_association_force_close (GstSctpAssociation * self) gst_sctp_association_force_close (GstSctpAssociation * self)
{ {
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->sctp_ass_sock) { if (self->sctp_ass_sock) {
usrsctp_close (self->sctp_ass_sock); usrsctp_close (self->sctp_ass_sock);
self->sctp_ass_sock = NULL; self->sctp_ass_sock = NULL;
} }
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
} }
static struct socket * static struct socket *
@ -634,7 +632,7 @@ client_role_connect (GstSctpAssociation * self)
struct sockaddr_conn addr; struct sockaddr_conn addr;
gint ret; gint ret;
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
addr = get_sctp_socket_address (self, self->local_port); addr = get_sctp_socket_address (self, self->local_port);
ret = ret =
usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr, usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr,
@ -652,10 +650,10 @@ client_role_connect (GstSctpAssociation * self)
g_warning ("usrsctp_connect() error: (%u) %s", errno, strerror (errno)); g_warning ("usrsctp_connect() error: (%u) %s", errno, strerror (errno));
goto error; goto error;
} }
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
return TRUE; return TRUE;
error: error:
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
return FALSE; return FALSE;
} }
@ -665,9 +663,11 @@ sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
{ {
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr); GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr);
g_rec_mutex_lock (&self->association_mutex);
if (self->packet_out_cb) { if (self->packet_out_cb) {
self->packet_out_cb (self, buffer, length, self->packet_out_user_data); self->packet_out_cb (self, buffer, length, self->packet_out_user_data);
} }
g_rec_mutex_unlock (&self->association_mutex);
return 0; return 0;
} }
@ -769,7 +769,7 @@ handle_association_changed (GstSctpAssociation * self,
switch (sac->sac_state) { switch (sac->sac_state) {
case SCTP_COMM_UP: case SCTP_COMM_UP:
g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP_COMM_UP()"); g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP_COMM_UP()");
g_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) { if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) {
change_state = TRUE; change_state = TRUE;
new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED; new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED;
@ -779,7 +779,7 @@ handle_association_changed (GstSctpAssociation * self,
} else { } else {
g_warning ("SCTP association in unexpected state"); g_warning ("SCTP association in unexpected state");
} }
g_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
break; break;
case SCTP_COMM_LOST: case SCTP_COMM_LOST:
g_warning ("SCTP event SCTP_COMM_LOST received"); g_warning ("SCTP event SCTP_COMM_LOST received");
@ -824,10 +824,12 @@ static void
handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen, handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen,
guint16 stream_id, guint32 ppid) guint16 stream_id, guint32 ppid)
{ {
g_rec_mutex_lock (&self->association_mutex);
if (self->packet_received_cb) { if (self->packet_received_cb) {
self->packet_received_cb (self, data, datalen, stream_id, ppid, self->packet_received_cb (self, data, datalen, stream_id, ppid,
self->packet_received_user_data); self->packet_received_user_data);
} }
g_rec_mutex_unlock (&self->association_mutex);
} }
static void static void

View file

@ -78,7 +78,7 @@ struct _GstSctpAssociation
gboolean use_sock_stream; gboolean use_sock_stream;
struct socket *sctp_ass_sock; struct socket *sctp_ass_sock;
GMutex association_mutex; GRecMutex association_mutex;
GstSctpAssociationState state; GstSctpAssociationState state;
@ -86,9 +86,11 @@ struct _GstSctpAssociation
GstSctpAssociationPacketReceivedCb packet_received_cb; GstSctpAssociationPacketReceivedCb packet_received_cb;
gpointer packet_received_user_data; gpointer packet_received_user_data;
GDestroyNotify packet_received_destroy_notify;
GstSctpAssociationPacketOutCb packet_out_cb; GstSctpAssociationPacketOutCb packet_out_cb;
gpointer packet_out_user_data; gpointer packet_out_user_data;
GDestroyNotify packet_out_destroy_notify;
}; };
struct _GstSctpAssociationClass struct _GstSctpAssociationClass
@ -105,9 +107,9 @@ GstSctpAssociation *gst_sctp_association_get (guint32 association_id);
gboolean gst_sctp_association_start (GstSctpAssociation * self); gboolean gst_sctp_association_start (GstSctpAssociation * self);
void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data); GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data, GDestroyNotify destroy_notify);
void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data); GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data, GDestroyNotify destroy_notify);
void gst_sctp_association_incoming_packet (GstSctpAssociation * self, void gst_sctp_association_incoming_packet (GstSctpAssociation * self,
guint8 * buf, guint32 length); guint8 * buf, guint32 length);
gboolean gst_sctp_association_send_data (GstSctpAssociation * self, gboolean gst_sctp_association_send_data (GstSctpAssociation * self,