diff --git a/ext/dtls/gstdtlsconnection.c b/ext/dtls/gstdtlsconnection.c index 86a0e4e561..a3dfeb7ca5 100644 --- a/ext/dtls/gstdtlsconnection.c +++ b/ext/dtls/gstdtlsconnection.c @@ -87,6 +87,10 @@ struct _GstDtlsConnectionPrivate gboolean is_alive; gboolean keys_exported; + gboolean sent_close_notify; + gboolean received_close_notify; + gboolean fatal_error; + GMutex mutex; GCond condition; gpointer bio_buffer; @@ -112,7 +116,9 @@ static void gst_dtls_connection_set_property (GObject *, guint prop_id, static void log_state (GstDtlsConnection *, const gchar * str); static void export_srtp_keys (GstDtlsConnection *); -static void openssl_poll (GstDtlsConnection *); +static GstFlowReturn openssl_poll (GstDtlsConnection *, GError ** err); +static GstFlowReturn handle_error (GstDtlsConnection * self, int ret, + GstResourceError error_type, GError ** err); static int openssl_verify_callback (int preverify_ok, X509_STORE_CTX * x509_ctx); @@ -284,16 +290,18 @@ gst_dtls_connection_set_property (GObject * object, guint prop_id, } } -void -gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client) +gboolean +gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client, + GError ** err) { GstDtlsConnectionPrivate *priv; + gboolean ret; priv = self->priv; - g_return_if_fail (priv->send_callback); - g_return_if_fail (priv->ssl); - g_return_if_fail (priv->bio); + g_return_val_if_fail (priv->send_callback, FALSE); + g_return_val_if_fail (priv->ssl, FALSE); + g_return_val_if_fail (priv->bio, FALSE); GST_TRACE_OBJECT (self, "locking @ start"); g_mutex_lock (&priv->mutex); @@ -305,6 +313,10 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client) priv->bio_buffer_offset = 0; priv->keys_exported = FALSE; + priv->fatal_error = FALSE; + priv->sent_close_notify = FALSE; + priv->received_close_notify = FALSE; + priv->is_client = is_client; if (priv->is_client) { SSL_set_connect_state (priv->ssl); @@ -313,12 +325,19 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client) } log_state (self, "initial state set"); - openssl_poll (self); + ret = openssl_poll (self, err); + if (ret == GST_FLOW_EOS && err) { + *err = + g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_WRITE, + "Connection closed"); + } log_state (self, "first poll done"); GST_TRACE_OBJECT (self, "unlocking @ start"); g_mutex_unlock (&priv->mutex); + + return ret == GST_FLOW_OK; } static void @@ -342,7 +361,7 @@ handle_timeout (gpointer data, gpointer user_data) GST_WARNING_OBJECT (self, "handling timeout failed"); } else if (ret > 0) { log_state (self, "handling timeout before poll"); - openssl_poll (self); + openssl_poll (self, NULL); log_state (self, "handling timeout after poll"); } } @@ -507,11 +526,13 @@ gst_dtls_connection_set_send_callback (GstDtlsConnection * self, g_mutex_unlock (&priv->mutex); } -gint -gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len) +GstFlowReturn +gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gsize len, + gsize * written, GError ** err) { + GstFlowReturn flow_ret = GST_FLOW_OK; GstDtlsConnectionPrivate *priv; - gint result; + int ret; g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0); g_return_val_if_fail (self->priv->ssl, 0); @@ -525,6 +546,22 @@ gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len) g_warn_if_fail (!priv->bio_buffer); + if (self->priv->received_close_notify) { + GST_DEBUG_OBJECT (self, "Already received close_notify"); + g_mutex_unlock (&priv->mutex); + return GST_FLOW_EOS; + } + + if (self->priv->fatal_error) { + GST_ERROR_OBJECT (self, "Had a fatal error before"); + g_mutex_unlock (&priv->mutex); + if (err) + *err = + g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ, + "Had fatal error before"); + return GST_FLOW_ERROR; + } + priv->bio_buffer = data; priv->bio_buffer_len = len; priv->bio_buffer_offset = 0; @@ -532,29 +569,50 @@ gst_dtls_connection_process (GstDtlsConnection * self, gpointer data, gint len) log_state (self, "process start"); if (SSL_want_write (priv->ssl)) { - openssl_poll (self); + flow_ret = openssl_poll (self, err); log_state (self, "process want write, after poll"); + if (flow_ret != GST_FLOW_OK) { + g_mutex_unlock (&priv->mutex); + return flow_ret; + } } - result = SSL_read (priv->ssl, data, len); + ret = SSL_read (priv->ssl, data, len); + *written = ret >= 0 ? ret : 0; + GST_DEBUG_OBJECT (self, "read result: %d", ret); + + flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_READ, err); + if (flow_ret == GST_FLOW_EOS) { + self->priv->received_close_notify = TRUE; + /* Notify about the connection being properly closed now if both + * sides did so */ + if (self->priv->sent_close_notify && self->priv->send_callback) + self->priv->send_callback (self, NULL, 0, NULL); + + g_mutex_unlock (&priv->mutex); + return flow_ret; + } else if (flow_ret != GST_FLOW_OK) { + g_mutex_unlock (&priv->mutex); + return flow_ret; + } log_state (self, "process after read"); - openssl_poll (self); + flow_ret = openssl_poll (self, err); log_state (self, "process after poll"); - GST_DEBUG_OBJECT (self, "read result: %d", result); - GST_TRACE_OBJECT (self, "unlocking @ process"); g_mutex_unlock (&priv->mutex); - return result; + return flow_ret; } -gint -gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len) +GstFlowReturn +gst_dtls_connection_send (GstDtlsConnection * self, gconstpointer data, + gsize len, gsize * written, GError ** err) { + GstFlowReturn flow_ret; int ret = 0; g_return_val_if_fail (GST_IS_DTLS_CONNECTION (self), 0); @@ -566,20 +624,65 @@ gst_dtls_connection_send (GstDtlsConnection * self, gpointer data, gint len) g_mutex_lock (&self->priv->mutex); GST_TRACE_OBJECT (self, "locked @ send"); - if (SSL_is_init_finished (self->priv->ssl)) { + if (self->priv->fatal_error) { + GST_ERROR_OBJECT (self, "Had a fatal error before"); + g_mutex_unlock (&self->priv->mutex); + if (err) + *err = + g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Had fatal error before"); + return GST_FLOW_ERROR; + } + + if (self->priv->sent_close_notify) { + len = 0; + GST_DEBUG_OBJECT (self, "Not sending new data after close_notify"); + } + + if (len == 0) { + if (written) + *written = 0; + GST_DEBUG_OBJECT (self, "Sending close_notify"); + ret = SSL_shutdown (self->priv->ssl); + self->priv->sent_close_notify = TRUE; + if (ret == 1) { + GST_LOG_OBJECT (self, "received peer close_notify already"); + self->priv->received_close_notify = TRUE; + flow_ret = GST_FLOW_EOS; + } else if (ret == 0) { + GST_LOG_OBJECT (self, "did not receive peer close_notify yet"); + flow_ret = GST_FLOW_OK; + } else { + flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, err); + } + } else if (SSL_is_init_finished (self->priv->ssl)) { + GST_DEBUG_OBJECT (self, "sending data of %" G_GSIZE_FORMAT " B", len); ret = SSL_write (self->priv->ssl, data, len); - GST_DEBUG_OBJECT (self, "data sent: input was %d B, output is %d B", len, - ret); + if (ret <= 0) { + if (written) + *written = 0; + flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_WRITE, err); + } else { + if (written) + *written = ret; + flow_ret = GST_FLOW_OK; + } } else { + if (written) + *written = ret; GST_WARNING_OBJECT (self, "tried to send data before handshake was complete"); - ret = 0; + if (err) + *err = + g_error_new_literal (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Tried to send data before handshake was complete"); + flow_ret = GST_FLOW_ERROR; } GST_TRACE_OBJECT (self, "unlocking @ send"); g_mutex_unlock (&self->priv->mutex); - return ret; + return flow_ret; } /* @@ -724,11 +827,78 @@ ssl_err_cb (const char *str, size_t len, void *u) return 0; } -static void -openssl_poll (GstDtlsConnection * self) +static GstFlowReturn +handle_error (GstDtlsConnection * self, int ret, GstResourceError error_type, + GError ** err) +{ + int error; + + error = SSL_get_error (self->priv->ssl, ret); + + switch (error) { + case SSL_ERROR_NONE: + GST_TRACE_OBJECT (self, "No error"); + return GST_FLOW_OK; + case SSL_ERROR_SSL: + GST_ERROR_OBJECT (self, "Fatal SSL error"); + self->priv->fatal_error = TRUE; + ERR_print_errors_cb (ssl_err_cb, self); + if (err) + *err = + g_error_new_literal (GST_RESOURCE_ERROR, error_type, + "Fatal SSL error"); + return GST_FLOW_ERROR; + case SSL_ERROR_ZERO_RETURN: + GST_LOG_OBJECT (self, "Connection was closed"); + return GST_FLOW_EOS; + case SSL_ERROR_WANT_READ: + GST_LOG_OBJECT (self, "SSL wants read"); + return GST_FLOW_OK; + case SSL_ERROR_WANT_WRITE: + GST_LOG_OBJECT (self, "SSL wants write"); + return GST_FLOW_OK; + case SSL_ERROR_SYSCALL:{ + gchar message[1024] = ""; + gint syserror; +#ifdef G_OS_WIN32 + syserror = WSAGetLastError (); + FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message, + sizeof message, NULL); +#else + syserror = errno; + strerror_r (syserror, message, sizeof message); +#endif + + if (syserror == 0) { + GST_TRACE_OBJECT (self, "No error"); + return GST_FLOW_OK; + } else { + GST_ERROR_OBJECT (self, "Fatal SSL syscall error: errno %d: %s", + syserror, message); + if (err) + *err = + g_error_new (GST_RESOURCE_ERROR, error_type, + "Fatal SSL syscall error: errno %d: %s", syserror, message); + self->priv->fatal_error = TRUE; + return GST_FLOW_ERROR; + } + } + default: + self->priv->fatal_error = TRUE; + GST_ERROR_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret); + if (err) + *err = + g_error_new (GST_RESOURCE_ERROR, error_type, + "Unknown SSL error: %d, ret: %d", error, ret); + return GST_FLOW_ERROR; + } +} + +static GstFlowReturn +openssl_poll (GstDtlsConnection * self, GError ** err) { int ret; - int error; + GstFlowReturn flow_ret; log_state (self, "poll: before handshake"); @@ -746,54 +916,23 @@ openssl_poll (GstDtlsConnection * self) } else { GST_INFO_OBJECT (self, "handshake is completed"); } - return; + return GST_FLOW_OK; case 0: GST_DEBUG_OBJECT (self, "do_handshake encountered EOF"); break; case -1: - GST_DEBUG_OBJECT (self, "do_handshake encountered BIO error"); + GST_DEBUG_OBJECT (self, "do_handshake encountered potential BIO error"); break; default: GST_DEBUG_OBJECT (self, "do_handshake returned %d", ret); + break; } - error = SSL_get_error (self->priv->ssl, ret); - - switch (error) { - case SSL_ERROR_NONE: - GST_WARNING_OBJECT (self, "no error, handshake should be done"); - break; - case SSL_ERROR_SSL: - GST_ERROR_OBJECT (self, "SSL error"); - ERR_print_errors_cb (ssl_err_cb, self); - return; - case SSL_ERROR_WANT_READ: - GST_LOG_OBJECT (self, "SSL wants read"); - break; - case SSL_ERROR_WANT_WRITE: - GST_LOG_OBJECT (self, "SSL wants write"); - break; - case SSL_ERROR_SYSCALL:{ - gchar message[1024] = ""; - gint syserror; -#ifdef G_OS_WIN32 - syserror = WSAGetLastError (); - FormatMessage (FORMAT_MESSAGE_FROM_SYSTEM, NULL, syserror, 0, message, - sizeof message, NULL); -#else - syserror = errno; - strerror_r (syserror, message, sizeof message); -#endif - GST_CAT_LEVEL_LOG (GST_CAT_DEFAULT, - syserror != 0 ? GST_LEVEL_WARNING : GST_LEVEL_LOG, - self, "SSL syscall error: errno %d: %s", syserror, message); - break; - } - default: - GST_WARNING_OBJECT (self, "Unknown SSL error: %d, ret: %d", error, ret); - } + flow_ret = handle_error (self, ret, GST_RESOURCE_ERROR_OPEN_WRITE, err); ERR_print_errors_cb (ssl_warn_cb, self); + + return flow_ret; } static int diff --git a/ext/dtls/gstdtlsconnection.h b/ext/dtls/gstdtlsconnection.h index bc2c569b1b..6315ef12e5 100644 --- a/ext/dtls/gstdtlsconnection.h +++ b/ext/dtls/gstdtlsconnection.h @@ -26,7 +26,7 @@ #ifndef gstdtlsconnection_h #define gstdtlsconnection_h -#include +#include G_BEGIN_DECLS @@ -84,7 +84,7 @@ struct _GstDtlsConnectionClass { GType gst_dtls_connection_get_type(void) G_GNUC_CONST; -void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client); +gboolean gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client, GError **err); void gst_dtls_connection_check_timeout(GstDtlsConnection *); /* @@ -108,15 +108,30 @@ void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GstDtlsConnectio /* * Processes data that has been received, the transformation is done in-place. - * Returns the length of the plaintext data that was decoded, if no data is available, 0<= will be returned. + * + * Returns: + * - GST_FLOW_EOS if the receive side of the DTLS connection was closed by + * the peer, i.e. close_notify was sent by the peer + * - GST_FLOW_ERROR + err if an error happened + * - GST_FLOW_OK + written >= 0 if processing was successful. ptr then + * contains the decoded bytes */ -gint gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gint len); +GstFlowReturn gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gsize len, gsize *written, GError **err); /* - * If the DTLS handshake is completed this function will encode the given data. - * Returns the length of the data sent, or 0 if the DTLS handshake is not completed. + * Will encode and send the given data. + * + * Sending with len == 0 will close the send side of the DTLS connection and + * no further data can be sent anymore in the future. This will also send the + * close_notify to the peer. + * + * Returns: + * - GST_FLOW_EOS if the send side of the DTLS connection was closed, i.e. + * we received an EOS before. + * - GST_FLOW_ERROR + err if an error happened + * - GST_FLOW_OK + written >= 0 if processing was successful */ -gint gst_dtls_connection_send(GstDtlsConnection *, gpointer ptr, gint len); +GstFlowReturn gst_dtls_connection_send(GstDtlsConnection *, gconstpointer ptr, gsize len, gsize *written, GError **err); G_END_DECLS diff --git a/ext/dtls/gstdtlsdec.c b/ext/dtls/gstdtlsdec.c index 0d4ffe7826..bba0035c9a 100644 --- a/ext/dtls/gstdtlsdec.c +++ b/ext/dtls/gstdtlsdec.c @@ -452,62 +452,105 @@ on_peer_certificate_received (GstDtlsConnection * connection, gchar * pem, return TRUE; } -static gint +static GstFlowReturn process_buffer (GstDtlsDec * self, GstBuffer * buffer) { + GstFlowReturn flow_ret; GstMapInfo map_info; - gint size; + GError *err = NULL; + gsize written = 0; if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE)) - return 0; + return GST_FLOW_ERROR; if (!map_info.size) { gst_buffer_unmap (buffer, &map_info); - return 0; + return GST_FLOW_ERROR; } - size = + flow_ret = gst_dtls_connection_process (self->connection, map_info.data, - map_info.size); + map_info.size, &written, &err); gst_buffer_unmap (buffer, &map_info); - if (size <= 0) - return size; + switch (flow_ret) { + case GST_FLOW_OK: + GST_LOG_OBJECT (self, + "Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT, + map_info.size, written); + gst_buffer_set_size (buffer, written); + break; + case GST_FLOW_EOS: + gst_buffer_set_size (buffer, written); + GST_DEBUG_OBJECT (self, "Peer closed the connection"); + break; + case GST_FLOW_ERROR: + GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message); + GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message)); + g_clear_error (&err); + break; + default: + g_assert_not_reached (); + } + g_assert (err == NULL); - gst_buffer_set_size (buffer, size); - - return size; + return flow_ret; } +typedef struct +{ + GstDtlsDec *self; + GstFlowReturn flow_ret; + guint processed; +} ProcessListData; + static gboolean process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data) { - GstDtlsDec *self = GST_DTLS_DEC (user_data); - gint size; + ProcessListData *process_list_data = user_data; + GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self); + GstFlowReturn flow_ret; *buffer = gst_buffer_make_writable (*buffer); - size = process_buffer (self, *buffer); - if (size <= 0) - gst_buffer_replace (buffer, NULL); + flow_ret = process_buffer (self, *buffer); - return TRUE; + process_list_data->flow_ret = flow_ret; + if (gst_buffer_get_size (*buffer) == 0) + gst_buffer_replace (buffer, NULL); + else if (flow_ret != GST_FLOW_ERROR) + process_list_data->processed++; + + return flow_ret == GST_FLOW_OK; } static GstFlowReturn sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list) { GstDtlsDec *self = GST_DTLS_DEC (parent); - GstFlowReturn ret = GST_FLOW_OK; GstPad *other_pad; + ProcessListData process_list_data = { self, GST_FLOW_OK, 0 }; list = gst_buffer_list_make_writable (list); - gst_buffer_list_foreach (list, process_buffer_from_list, self); + gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data); + + /* If we successfully processed at least some buffers then forward those */ + if (process_list_data.flow_ret != GST_FLOW_OK + && process_list_data.processed == 0) { + GST_ERROR_OBJECT (self, "Failed to process buffer list: %s", + gst_flow_get_name (process_list_data.flow_ret)); + gst_buffer_list_unref (list); + return process_list_data.flow_ret; + } + + /* Remove all buffers after the first one that failed to be processed */ + gst_buffer_list_remove (list, process_list_data.processed, + gst_buffer_list_length (list) - process_list_data.processed); if (gst_buffer_list_length (list) == 0) { GST_DEBUG_OBJECT (self, "Not produced any buffers"); gst_buffer_list_unref (list); - return GST_FLOW_OK; + return process_list_data.flow_ret; } g_mutex_lock (&self->src_mutex); @@ -517,17 +560,25 @@ sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list) g_mutex_unlock (&self->src_mutex); if (other_pad) { - GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing", + gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS; + + GST_LOG_OBJECT (self, "pushing buffer list with length %u", gst_buffer_list_length (list)); - ret = gst_pad_push_list (other_pad, list); + process_list_data.flow_ret = gst_pad_push_list (other_pad, list); + + /* If the peer closed the connection, signal that we're done here now */ + if (was_eos) + gst_pad_push_event (other_pad, gst_event_new_eos ()); + gst_object_unref (other_pad); } else { - GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked", + GST_LOG_OBJECT (self, + "dropping buffer list with length %d, have no source pad", gst_buffer_list_length (list)); gst_buffer_list_unref (list); } - return ret; + return process_list_data.flow_ret; } static GstFlowReturn @@ -535,7 +586,6 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstDtlsDec *self = GST_DTLS_DEC (parent); GstFlowReturn ret = GST_FLOW_OK; - gint size; GstPad *other_pad; if (!self->agent) { @@ -548,12 +598,12 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) self->connection_id, gst_buffer_get_size (buffer)); buffer = gst_buffer_make_writable (buffer); - size = process_buffer (self, buffer); - - if (size <= 0) { + ret = process_buffer (self, buffer); + if (ret == GST_FLOW_ERROR) { + GST_ERROR_OBJECT (self, "Failed to process buffer: %s", + gst_flow_get_name (ret)); gst_buffer_unref (buffer); - - return GST_FLOW_OK; + return ret; } g_mutex_lock (&self->src_mutex); @@ -563,11 +613,25 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) g_mutex_unlock (&self->src_mutex); if (other_pad) { - GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size); - ret = gst_pad_push (other_pad, buffer); + gboolean was_eos = (ret == GST_FLOW_EOS); + + if (gst_buffer_get_size (buffer) > 0) { + GST_LOG_OBJECT (self, "pushing buffer"); + ret = gst_pad_push (other_pad, buffer); + } else { + gst_buffer_unref (buffer); + } + + /* If the peer closed the connection, signal that we're done here now */ + if (was_eos) { + gst_pad_push_event (other_pad, gst_event_new_eos ()); + if (ret == GST_FLOW_OK) + ret = GST_FLOW_EOS; + } + gst_object_unref (other_pad); } else { - GST_LOG_OBJECT (self, "dropped buffer with length %d, not linked", size); + GST_LOG_OBJECT (self, "dropping buffer, have no source pad"); gst_buffer_unref (buffer); } diff --git a/ext/dtls/gstdtlsenc.c b/ext/dtls/gstdtlsenc.c index 1f94c4b703..fd597227df 100644 --- a/ext/dtls/gstdtlsenc.c +++ b/ext/dtls/gstdtlsenc.c @@ -101,7 +101,7 @@ static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event); static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher, guint auth, GstDtlsEnc *); static gboolean on_send_data (GstDtlsConnection *, gconstpointer data, - gint length, GstDtlsEnc *); + gsize length, GstDtlsEnc *); static void gst_dtls_enc_class_init (GstDtlsEncClass * klass) @@ -326,10 +326,17 @@ gst_dtls_enc_change_state (GstElement * element, GstStateChange transition) ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); switch (transition) { - case GST_STATE_CHANGE_READY_TO_PAUSED: + case GST_STATE_CHANGE_READY_TO_PAUSED:{ + GError *err = NULL; + GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id); - gst_dtls_connection_start (self->connection, self->is_client); + if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) { + GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s", + err->message)); + g_clear_error (&err); + } break; + } default: break; } @@ -460,17 +467,25 @@ src_task_loop (GstPad * pad) GST_TRACE_OBJECT (self, "src loop: releasing lock"); - ret = gst_pad_push (self->src, buffer); - if (check_connection_timeout) - gst_dtls_connection_check_timeout (self->connection); + if (buffer) { + ret = gst_pad_push (self->src, buffer); + if (check_connection_timeout) + gst_dtls_connection_check_timeout (self->connection); - if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { - GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s", - gst_flow_get_name (ret)); + if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) { + GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s", + gst_flow_get_name (ret)); + } + g_mutex_lock (&self->queue_lock); + self->src_ret = ret; + g_mutex_unlock (&self->queue_lock); + } else { + GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS"); + gst_pad_push_event (self->src, gst_event_new_eos ()); + g_mutex_lock (&self->queue_lock); + self->src_ret = GST_FLOW_EOS; + g_mutex_unlock (&self->queue_lock); } - g_mutex_lock (&self->queue_lock); - self->src_ret = ret; - g_mutex_unlock (&self->queue_lock); } static GstFlowReturn @@ -478,7 +493,9 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstDtlsEnc *self = GST_DTLS_ENC (parent); GstMapInfo map_info; - gint ret; + GError *err = NULL; + gsize to_write, written = 0; + GstFlowReturn ret = GST_FLOW_OK; g_mutex_lock (&self->queue_lock); if (self->src_ret != GST_FLOW_OK) { @@ -495,28 +512,48 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) gst_buffer_map (buffer, &map_info, GST_MAP_READ); - if (map_info.size) { + to_write = map_info.size; + + while (to_write > 0 && ret == GST_FLOW_OK) { ret = gst_dtls_connection_send (self->connection, map_info.data, - map_info.size); - if (ret != map_info.size) { - GST_WARNING_OBJECT (self, - "error sending data: %d B were written, expected value was %" - G_GSIZE_FORMAT " B", ret, map_info.size); + map_info.size, &written, &err); + + switch (ret) { + case GST_FLOW_OK: + GST_DEBUG_OBJECT (self, + "Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written, + map_info.size); + g_assert (written <= to_write); + to_write -= written; + break; + case GST_FLOW_EOS: + GST_INFO_OBJECT (self, "Received data after the connection was closed"); + break; + case GST_FLOW_ERROR: + GST_WARNING_OBJECT (self, "error sending data: %s", err->message); + GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message)); + g_clear_error (&err); + break; + default: + g_assert_not_reached (); + break; } + + g_assert (err == NULL); } gst_buffer_unmap (buffer, &map_info); - gst_buffer_unref (buffer); - return GST_FLOW_OK; + return ret; } static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { + GstDtlsEnc *self = GST_DTLS_ENC (parent); gboolean ret = FALSE; switch (GST_EVENT_TYPE (event)) { @@ -528,6 +565,28 @@ sink_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_event_unref (event); ret = TRUE; break; + case GST_EVENT_EOS:{ + GstFlowReturn flow_ret; + + /* Close the write side of the connection now */ + flow_ret = + gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL); + + if (flow_ret != GST_FLOW_OK) + GST_ERROR_OBJECT (self, "Failed to send close_notify"); + + /* Do not forward the EOS event unless the peer already closed to the + * connection itself. If it didn't yet then we'll later get the send + * callback called with no data and send EOS from there */ + if (flow_ret == GST_FLOW_EOS) { + ret = gst_pad_event_default (pad, parent, event); + } else { + gst_event_unref (event); + ret = TRUE; + } + + break; + } default: ret = gst_pad_event_default (pad, parent, event); break; @@ -567,16 +626,17 @@ on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher, } static gboolean -on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length, +on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length, GstDtlsEnc * self) { GstBuffer *buffer; gboolean ret; - GST_DEBUG_OBJECT (self, "sending data from %s with length %d", + GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT, self->connection_id, length); - buffer = gst_buffer_new_wrapped (g_memdup (data, length), length); + buffer = + data ? gst_buffer_new_wrapped (g_memdup (data, length), length) : NULL; GST_TRACE_OBJECT (self, "send data: acquiring lock"); g_mutex_lock (&self->queue_lock);