/* * Copyright (c) 2015, Collabora Ltd. * * Redistribution and use in source and binary forms, with or without modification, * are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, this * list of conditions and the following disclaimer in the documentation and/or other * materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY * OF SUCH DAMAGE. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "sctpassociation.h" #include #include #include #include GST_DEBUG_CATEGORY_STATIC (gst_sctp_association_debug_category); #define GST_CAT_DEFAULT gst_sctp_association_debug_category #define GST_SCTP_ASSOCIATION_STATE_TYPE (gst_sctp_association_state_get_type()) static GType gst_sctp_association_state_get_type (void) { static const GEnumValue values[] = { {GST_SCTP_ASSOCIATION_STATE_NEW, "state-new", "state-new"}, {GST_SCTP_ASSOCIATION_STATE_READY, "state-ready", "state-ready"}, {GST_SCTP_ASSOCIATION_STATE_CONNECTING, "state-connecting", "state-connecting"}, {GST_SCTP_ASSOCIATION_STATE_CONNECTED, "state-connected", "state-connected"}, {GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, "state-disconnecting", "state-disconnecting"}, {GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, "state-disconnected", "state-disconnected"}, {GST_SCTP_ASSOCIATION_STATE_ERROR, "state-error", "state-error"}, {0, NULL, NULL} }; static volatile GType id = 0; if (g_once_init_enter ((gsize *) & id)) { GType _id; _id = g_enum_register_static ("GstSctpAssociationState", values); g_once_init_leave ((gsize *) & id, _id); } return id; } G_DEFINE_TYPE (GstSctpAssociation, gst_sctp_association, G_TYPE_OBJECT); enum { SIGNAL_STREAM_RESET, LAST_SIGNAL }; enum { PROP_0, PROP_ASSOCIATION_ID, PROP_LOCAL_PORT, PROP_REMOTE_PORT, PROP_STATE, PROP_USE_SOCK_STREAM, NUM_PROPERTIES }; static guint signals[LAST_SIGNAL] = { 0 }; static GParamSpec *properties[NUM_PROPERTIES]; #define DEFAULT_NUMBER_OF_SCTP_STREAMS 1024 #define DEFAULT_LOCAL_SCTP_PORT 0 #define DEFAULT_REMOTE_SCTP_PORT 0 static GHashTable *associations = NULL; G_LOCK_DEFINE_STATIC (associations_lock); static guint32 number_of_associations = 0; /* Interface implementations */ static void gst_sctp_association_finalize (GObject * object); static void gst_sctp_association_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_sctp_association_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static struct socket *create_sctp_socket (GstSctpAssociation * gst_sctp_association); static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation * gst_sctp_association, guint16 port); static gpointer connection_thread_func (GstSctpAssociation * self); static gboolean client_role_connect (GstSctpAssociation * self); static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos, guint8 set_df); static int receive_cb (struct socket *sock, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info); static void handle_notification (GstSctpAssociation * self, const union sctp_notification *notification, size_t length); static void handle_association_changed (GstSctpAssociation * self, const struct sctp_assoc_change *sac); static void handle_stream_reset_event (GstSctpAssociation * self, const struct sctp_stream_reset_event *ssr); static void handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen, guint16 stream_id, guint32 ppid); static void maybe_set_state_to_ready (GstSctpAssociation * self); static gboolean gst_sctp_association_change_state (GstSctpAssociation * self, GstSctpAssociationState new_state, gboolean notify); static void gst_sctp_association_class_init (GstSctpAssociationClass * klass) { GObjectClass *gobject_class; gobject_class = (GObjectClass *) klass; gobject_class->finalize = gst_sctp_association_finalize; gobject_class->set_property = gst_sctp_association_set_property; gobject_class->get_property = gst_sctp_association_get_property; signals[SIGNAL_STREAM_RESET] = g_signal_new ("stream-reset", G_OBJECT_CLASS_TYPE (klass), G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSctpAssociationClass, on_sctp_stream_reset), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT); properties[PROP_ASSOCIATION_ID] = g_param_spec_uint ("association-id", "The SCTP association-id", "The SCTP association-id.", 0, G_MAXUSHORT, DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); properties[PROP_LOCAL_PORT] = g_param_spec_uint ("local-port", "Local SCTP", "The local SCTP port for this association", 0, G_MAXUSHORT, DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); properties[PROP_REMOTE_PORT] = g_param_spec_uint ("remote-port", "Remote SCTP", "The remote SCTP port for this association", 0, G_MAXUSHORT, DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); properties[PROP_STATE] = g_param_spec_enum ("state", "SCTP Association state", "The state of the SCTP association", GST_SCTP_ASSOCIATION_STATE_TYPE, GST_SCTP_ASSOCIATION_STATE_NEW, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); properties[PROP_USE_SOCK_STREAM] = g_param_spec_boolean ("use-sock-stream", "Use sock-stream", "When set to TRUE, a sequenced, reliable, connection-based connection is used." "When TRUE the partial reliability parameters of the channel is ignored.", FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties); } static void gst_sctp_association_init (GstSctpAssociation * self) { /* No need to lock mutex here as long as the function is only called from gst_sctp_association_get */ if (number_of_associations == 0) { usrsctp_init (0, sctp_packet_out, g_print); /* Explicit Congestion Notification */ usrsctp_sysctl_set_sctp_ecn_enable (0); usrsctp_sysctl_set_sctp_nr_outgoing_streams_default (DEFAULT_NUMBER_OF_SCTP_STREAMS); } number_of_associations++; self->local_port = DEFAULT_LOCAL_SCTP_PORT; self->remote_port = DEFAULT_REMOTE_SCTP_PORT; self->sctp_ass_sock = NULL; self->connection_thread = NULL; g_rec_mutex_init (&self->association_mutex); self->state = GST_SCTP_ASSOCIATION_STATE_NEW; self->use_sock_stream = FALSE; usrsctp_register_address ((void *) self); } static void gst_sctp_association_finalize (GObject * object) { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); G_LOCK (associations_lock); g_hash_table_remove (associations, GUINT_TO_POINTER (self->association_id)); usrsctp_deregister_address ((void *) self); number_of_associations--; if (number_of_associations == 0) { usrsctp_finish (); } G_UNLOCK (associations_lock); if (self->connection_thread) g_thread_join (self->connection_thread); G_OBJECT_CLASS (gst_sctp_association_parent_class)->finalize (object); } static void gst_sctp_association_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); g_rec_mutex_lock (&self->association_mutex); if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) { switch (prop_id) { case PROP_LOCAL_PORT: case PROP_REMOTE_PORT: GST_ERROR_OBJECT (self, "These properties cannot be set in this state"); goto error; } } switch (prop_id) { case PROP_ASSOCIATION_ID: self->association_id = g_value_get_uint (value); break; case PROP_LOCAL_PORT: self->local_port = g_value_get_uint (value); break; case PROP_REMOTE_PORT: self->remote_port = g_value_get_uint (value); break; case PROP_STATE: self->state = g_value_get_enum (value); break; case PROP_USE_SOCK_STREAM: self->use_sock_stream = g_value_get_boolean (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); break; } g_rec_mutex_unlock (&self->association_mutex); if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT) maybe_set_state_to_ready (self); return; error: g_rec_mutex_unlock (&self->association_mutex); } static void maybe_set_state_to_ready (GstSctpAssociation * self) { gboolean signal_ready_state = FALSE; g_rec_mutex_lock (&self->association_mutex); if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) && (self->local_port != 0 && self->remote_port != 0) && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) { signal_ready_state = gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY, FALSE); } g_rec_mutex_unlock (&self->association_mutex); if (signal_ready_state) g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]); } static void gst_sctp_association_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); switch (prop_id) { case PROP_ASSOCIATION_ID: g_value_set_uint (value, self->association_id); break; case PROP_LOCAL_PORT: g_value_set_uint (value, self->local_port); break; case PROP_REMOTE_PORT: g_value_set_uint (value, self->remote_port); break; case PROP_STATE: g_value_set_enum (value, self->state); break; case PROP_USE_SOCK_STREAM: g_value_set_boolean (value, self->use_sock_stream); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec); break; } } /* Public functions */ GstSctpAssociation * gst_sctp_association_get (guint32 association_id) { GstSctpAssociation *association; G_LOCK (associations_lock); GST_DEBUG_CATEGORY_INIT (gst_sctp_association_debug_category, "sctpassociation", 0, "debug category for sctpassociation"); if (!associations) { associations = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, NULL); } association = g_hash_table_lookup (associations, GUINT_TO_POINTER (association_id)); if (!association) { association = g_object_new (GST_SCTP_TYPE_ASSOCIATION, "association-id", association_id, NULL); g_hash_table_insert (associations, GUINT_TO_POINTER (association_id), association); } else { g_object_ref (association); } G_UNLOCK (associations_lock); return association; } gboolean gst_sctp_association_start (GstSctpAssociation * self) { gchar *thread_name; gboolean signal_state = FALSE; g_rec_mutex_lock (&self->association_mutex); if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) { GST_WARNING_OBJECT (self, "SCTP association is in wrong state and cannot be started"); goto configure_required; } if ((self->sctp_ass_sock = create_sctp_socket (self)) == NULL) goto error; signal_state |= gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE); g_rec_mutex_unlock (&self->association_mutex); thread_name = g_strdup_printf ("connection_thread_%u", self->association_id); self->connection_thread = g_thread_new (thread_name, (GThreadFunc) connection_thread_func, self); g_free (thread_name); if (signal_state) g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]); return TRUE; error: g_rec_mutex_unlock (&self->association_mutex); gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR, TRUE); return FALSE; configure_required: g_rec_mutex_unlock (&self->association_mutex); return FALSE; } void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data, GDestroyNotify destroy_notify) { g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); g_rec_mutex_lock (&self->association_mutex); if (self->packet_out_destroy_notify) self->packet_out_destroy_notify (self->packet_out_user_data); self->packet_out_cb = packet_out_cb; self->packet_out_user_data = user_data; self->packet_out_destroy_notify = destroy_notify; g_rec_mutex_unlock (&self->association_mutex); maybe_set_state_to_ready (self); } void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data, GDestroyNotify destroy_notify) { g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); g_rec_mutex_lock (&self->association_mutex); if (self->packet_received_destroy_notify) self->packet_received_destroy_notify (self->packet_received_user_data); self->packet_received_cb = packet_received_cb; self->packet_received_user_data = user_data; self->packet_received_destroy_notify = destroy_notify; g_rec_mutex_unlock (&self->association_mutex); maybe_set_state_to_ready (self); } void gst_sctp_association_incoming_packet (GstSctpAssociation * self, const guint8 * buf, guint32 length) { usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0); } GstFlowReturn gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf, guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered, GstSctpAssociationPartialReliability pr, guint32 reliability_param, guint32 * bytes_sent_) { GstFlowReturn flow_ret; struct sctp_sendv_spa spa; gint32 bytes_sent = 0; struct sockaddr_conn remote_addr; g_rec_mutex_lock (&self->association_mutex); if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) { if (self->state == GST_SCTP_ASSOCIATION_STATE_DISCONNECTED || self->state == GST_SCTP_ASSOCIATION_STATE_DISCONNECTING) { GST_ERROR_OBJECT (self, "Disconnected"); flow_ret = GST_FLOW_EOS; goto end; } else { GST_ERROR_OBJECT (self, "Association not connected yet"); flow_ret = GST_FLOW_ERROR; goto end; } } memset (&spa, 0, sizeof (spa)); spa.sendv_sndinfo.snd_ppid = g_htonl (ppid); spa.sendv_sndinfo.snd_sid = stream_id; spa.sendv_sndinfo.snd_flags = ordered ? 0 : SCTP_UNORDERED; spa.sendv_sndinfo.snd_context = 0; spa.sendv_sndinfo.snd_assoc_id = 0; spa.sendv_flags = SCTP_SEND_SNDINFO_VALID; if (pr != GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE) { spa.sendv_flags |= SCTP_SEND_PRINFO_VALID; spa.sendv_prinfo.pr_value = g_htonl (reliability_param); if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL) spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX) spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX; else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF) spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_BUF; } remote_addr = get_sctp_socket_address (self, self->remote_port); bytes_sent = usrsctp_sendv (self->sctp_ass_sock, buf, length, (struct sockaddr *) &remote_addr, 1, (void *) &spa, (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0); if (bytes_sent < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { bytes_sent = 0; /* Resending this buffer is taken care of by the gstsctpenc */ flow_ret = GST_FLOW_OK; goto end; } else { GST_ERROR_OBJECT (self, "Error sending data on stream %u: (%u) %s", stream_id, errno, g_strerror (errno)); flow_ret = GST_FLOW_ERROR; goto end; } } flow_ret = GST_FLOW_OK; end: g_rec_mutex_unlock (&self->association_mutex); if (bytes_sent_) *bytes_sent_ = bytes_sent; return flow_ret; } void gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id) { struct sctp_reset_streams *srs; socklen_t length; length = (socklen_t) (sizeof (struct sctp_reset_streams) + sizeof (guint16)); srs = (struct sctp_reset_streams *) g_malloc0 (length); srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; srs->srs_number_streams = 1; srs->srs_stream_list[0] = stream_id; g_rec_mutex_lock (&self->association_mutex); usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, length); g_rec_mutex_unlock (&self->association_mutex); g_free (srs); } void gst_sctp_association_force_close (GstSctpAssociation * self) { g_rec_mutex_lock (&self->association_mutex); if (self->sctp_ass_sock) { usrsctp_close (self->sctp_ass_sock); self->sctp_ass_sock = NULL; } g_rec_mutex_unlock (&self->association_mutex); gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, TRUE); } static struct socket * create_sctp_socket (GstSctpAssociation * self) { struct socket *sock; struct linger l; struct sctp_event event; struct sctp_assoc_value stream_reset; int value = 1; guint16 event_types[] = { SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE, SCTP_REMOTE_ERROR, SCTP_SEND_FAILED, SCTP_SHUTDOWN_EVENT, SCTP_ADAPTATION_INDICATION, /*SCTP_PARTIAL_DELIVERY_EVENT, */ /*SCTP_AUTHENTICATION_EVENT, */ SCTP_STREAM_RESET_EVENT, /*SCTP_SENDER_DRY_EVENT, */ /*SCTP_NOTIFICATIONS_STOPPED_EVENT, */ /*SCTP_ASSOC_RESET_EVENT, */ SCTP_STREAM_CHANGE_EVENT }; guint32 i; guint sock_type = self->use_sock_stream ? SOCK_STREAM : SOCK_SEQPACKET; if ((sock = usrsctp_socket (AF_CONN, sock_type, IPPROTO_SCTP, receive_cb, NULL, 0, (void *) self)) == NULL) { GST_ERROR_OBJECT (self, "Could not open SCTP socket: (%u) %s", errno, g_strerror (errno)); goto error; } /* Properly return errors */ if (usrsctp_set_non_blocking (sock, 1) < 0) { GST_ERROR_OBJECT (self, "Could not set non-blocking mode on SCTP socket: (%u) %s", errno, g_strerror (errno)); goto error; } memset (&l, 0, sizeof (l)); l.l_onoff = 1; l.l_linger = 0; if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_LINGER, (const void *) &l, (socklen_t) sizeof (struct linger)) < 0) { GST_ERROR_OBJECT (self, "Could not set SO_LINGER on SCTP socket: (%u) %s", errno, g_strerror (errno)); goto error; } if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_NODELAY, &value, sizeof (int))) { GST_ERROR_OBJECT (self, "Could not set SCTP_NODELAY: (%u) %s", errno, g_strerror (errno)); goto error; } memset (&stream_reset, 0, sizeof (stream_reset)); stream_reset.assoc_id = SCTP_ALL_ASSOC; stream_reset.assoc_value = 1; if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &stream_reset, sizeof (stream_reset))) { GST_ERROR_OBJECT (self, "Could not set SCTP_ENABLE_STREAM_RESET: (%u) %s", errno, g_strerror (errno)); goto error; } memset (&event, 0, sizeof (event)); event.se_assoc_id = SCTP_ALL_ASSOC; event.se_on = 1; for (i = 0; i < sizeof (event_types) / sizeof (event_types[0]); i++) { event.se_type = event_types[i]; if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof (event)) < 0) { GST_ERROR_OBJECT (self, "Failed to register event %u: (%u) %s", event_types[i], errno, g_strerror (errno)); } } return sock; error: if (sock) usrsctp_close (sock); return NULL; } static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation * gst_sctp_association, guint16 port) { struct sockaddr_conn addr; memset ((void *) &addr, 0, sizeof (struct sockaddr_conn)); #ifdef __APPLE__ addr.sconn_len = sizeof (struct sockaddr_conn); #endif addr.sconn_family = AF_CONN; addr.sconn_port = g_htons (port); addr.sconn_addr = (void *) gst_sctp_association; return addr; } static gpointer connection_thread_func (GstSctpAssociation * self) { /* TODO: Support both server and client role */ if (!client_role_connect (self)) gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR, TRUE); return NULL; } static gboolean client_role_connect (GstSctpAssociation * self) { struct sockaddr_conn addr; gint ret; g_rec_mutex_lock (&self->association_mutex); addr = get_sctp_socket_address (self, self->local_port); ret = usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr, sizeof (struct sockaddr_conn)); if (ret < 0) { GST_ERROR_OBJECT (self, "usrsctp_bind() error: (%u) %s", errno, g_strerror (errno)); goto error; } addr = get_sctp_socket_address (self, self->remote_port); ret = usrsctp_connect (self->sctp_ass_sock, (struct sockaddr *) &addr, sizeof (struct sockaddr_conn)); if (ret < 0 && errno != EINPROGRESS) { GST_ERROR_OBJECT (self, "usrsctp_connect() error: (%u) %s", errno, g_strerror (errno)); goto error; } g_rec_mutex_unlock (&self->association_mutex); return TRUE; error: g_rec_mutex_unlock (&self->association_mutex); return FALSE; } static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos, guint8 set_df) { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr); g_rec_mutex_lock (&self->association_mutex); if (self->packet_out_cb) { self->packet_out_cb (self, buffer, length, self->packet_out_user_data); } g_rec_mutex_unlock (&self->association_mutex); return 0; } static int receive_cb (struct socket *sock, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info) { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (ulp_info); if (!data) { /* Not sure if this can happend. */ GST_WARNING_OBJECT (self, "Received empty data buffer"); } else { if (flags & MSG_NOTIFICATION) { handle_notification (self, (const union sctp_notification *) data, datalen); /* We use this instead of a bare `free()` so that we use the `free` from * the C runtime that usrsctp was built with. This makes a difference on * Windows where libusrstcp and GStreamer can be linked to two different * CRTs. */ usrsctp_freedumpbuffer (data); } else { handle_message (self, data, datalen, rcv_info.rcv_sid, ntohl (rcv_info.rcv_ppid)); } } return 1; } static void handle_notification (GstSctpAssociation * self, const union sctp_notification *notification, size_t length) { g_assert (notification->sn_header.sn_length == length); switch (notification->sn_header.sn_type) { case SCTP_ASSOC_CHANGE: GST_DEBUG_OBJECT (self, "Event: SCTP_ASSOC_CHANGE"); handle_association_changed (self, ¬ification->sn_assoc_change); break; case SCTP_PEER_ADDR_CHANGE: GST_DEBUG_OBJECT (self, "Event: SCTP_PEER_ADDR_CHANGE"); break; case SCTP_REMOTE_ERROR: GST_ERROR_OBJECT (self, "Event: SCTP_REMOTE_ERROR (%u)", notification->sn_remote_error.sre_error); break; case SCTP_SEND_FAILED: GST_ERROR_OBJECT (self, "Event: SCTP_SEND_FAILED"); break; case SCTP_SHUTDOWN_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_SHUTDOWN_EVENT"); gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, TRUE); break; case SCTP_ADAPTATION_INDICATION: GST_DEBUG_OBJECT (self, "Event: SCTP_ADAPTATION_INDICATION"); break; case SCTP_PARTIAL_DELIVERY_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_PARTIAL_DELIVERY_EVENT"); break; case SCTP_AUTHENTICATION_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_AUTHENTICATION_EVENT"); break; case SCTP_STREAM_RESET_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_STREAM_RESET_EVENT"); handle_stream_reset_event (self, ¬ification->sn_strreset_event); break; case SCTP_SENDER_DRY_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_SENDER_DRY_EVENT"); break; case SCTP_NOTIFICATIONS_STOPPED_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_NOTIFICATIONS_STOPPED_EVENT"); break; case SCTP_ASSOC_RESET_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_ASSOC_RESET_EVENT"); break; case SCTP_STREAM_CHANGE_EVENT: GST_DEBUG_OBJECT (self, "Event: SCTP_STREAM_CHANGE_EVENT"); break; case SCTP_SEND_FAILED_EVENT: GST_ERROR_OBJECT (self, "Event: SCTP_SEND_FAILED_EVENT (%u)", notification->sn_send_failed_event.ssfe_error); break; default: break; } } static void handle_association_changed (GstSctpAssociation * self, const struct sctp_assoc_change *sac) { gboolean change_state = FALSE; GstSctpAssociationState new_state; switch (sac->sac_state) { case SCTP_COMM_UP: GST_DEBUG_OBJECT (self, "SCTP_COMM_UP"); g_rec_mutex_lock (&self->association_mutex); if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) { change_state = TRUE; new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED; GST_DEBUG_OBJECT (self, "SCTP association connected!"); } else if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTED) { GST_FIXME_OBJECT (self, "SCTP association already open"); } else { GST_WARNING_OBJECT (self, "SCTP association in unexpected state"); } g_rec_mutex_unlock (&self->association_mutex); break; case SCTP_COMM_LOST: GST_WARNING_OBJECT (self, "SCTP event SCTP_COMM_LOST received"); change_state = TRUE; new_state = GST_SCTP_ASSOCIATION_STATE_ERROR; break; case SCTP_RESTART: GST_DEBUG_OBJECT (self, "SCTP event SCTP_RESTART received"); break; case SCTP_SHUTDOWN_COMP: GST_DEBUG_OBJECT (self, "SCTP event SCTP_SHUTDOWN_COMP received"); change_state = TRUE; new_state = GST_SCTP_ASSOCIATION_STATE_DISCONNECTED; break; case SCTP_CANT_STR_ASSOC: GST_WARNING_OBJECT (self, "SCTP event SCTP_CANT_STR_ASSOC received"); change_state = TRUE; new_state = GST_SCTP_ASSOCIATION_STATE_ERROR; break; } if (change_state) gst_sctp_association_change_state (self, new_state, TRUE); } static void handle_stream_reset_event (GstSctpAssociation * self, const struct sctp_stream_reset_event *sr) { guint32 i, n; if (!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED) && !(sr->strreset_flags & SCTP_STREAM_RESET_DENIED)) { n = (sr->strreset_length - sizeof (struct sctp_stream_reset_event)) / sizeof (uint16_t); for (i = 0; i < n; i++) { if (sr->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { g_signal_emit (self, signals[SIGNAL_STREAM_RESET], 0, sr->strreset_stream_list[i]); } } } } static void handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen, guint16 stream_id, guint32 ppid) { g_rec_mutex_lock (&self->association_mutex); if (self->packet_received_cb) { /* It's the callbacks job to free the data correctly */ self->packet_received_cb (self, data, datalen, stream_id, ppid, self->packet_received_user_data); } else { /* We use this instead of a bare `free()` so that we use the `free` from * the C runtime that usrsctp was built with. This makes a difference on * Windows where libusrstcp and GStreamer can be linked to two different * CRTs. */ usrsctp_freedumpbuffer ((gchar *) data); } g_rec_mutex_unlock (&self->association_mutex); } /* Returns TRUE if notify==FALSE and notification is needed later */ static gboolean gst_sctp_association_change_state (GstSctpAssociation * self, GstSctpAssociationState new_state, gboolean notify) { if (self->state != new_state && self->state != GST_SCTP_ASSOCIATION_STATE_ERROR) { self->state = new_state; if (notify) { g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]); return FALSE; } else { return TRUE; } } else { return FALSE; } }