sctpenc: Correctly log/handle errors and handle short writes

This commit is contained in:
Sebastian Dröge 2020-01-30 16:11:57 +02:00
parent e9df80b235
commit db16265d86
2 changed files with 34 additions and 16 deletions

View file

@ -564,6 +564,8 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstMeta *meta; GstMeta *meta;
const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO; const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
GstFlowReturn flow_ret = GST_FLOW_ERROR; GstFlowReturn flow_ret = GST_FLOW_ERROR;
const guint8 *data;
guint32 length;
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (self->src_ret != GST_FLOW_OK) { if (self->src_ret != GST_FLOW_OK) {
@ -611,23 +613,32 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
goto error; goto error;
} }
data = map.data;
length = map.size;
g_mutex_lock (&sctpenc_pad->lock); g_mutex_lock (&sctpenc_pad->lock);
while (!sctpenc_pad->flushing) { while (!sctpenc_pad->flushing) {
gboolean data_sent = FALSE; gint32 bytes_sent;
g_mutex_unlock (&sctpenc_pad->lock); g_mutex_unlock (&sctpenc_pad->lock);
data_sent = bytes_sent =
gst_sctp_association_send_data (self->sctp_association, map.data, gst_sctp_association_send_data (self->sctp_association, data,
map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param); length, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
g_mutex_lock (&sctpenc_pad->lock); g_mutex_lock (&sctpenc_pad->lock);
if (data_sent) { if (bytes_sent < 0) {
sctpenc_pad->bytes_sent += map.size; GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
break; ("Failed to send data"));
} else if (!sctpenc_pad->flushing) { flow_ret = GST_FLOW_ERROR;
goto out;
} else if (bytes_sent < length && !sctpenc_pad->flushing) {
gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME; gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
sctpenc_pad->bytes_sent += bytes_sent;
data += bytes_sent;
length -= bytes_sent;
/* The buffer was probably full. Retry in a while */ /* The buffer was probably full. Retry in a while */
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
g_queue_push_tail (&self->pending_pads, sctpenc_pad); g_queue_push_tail (&self->pending_pads, sctpenc_pad);
@ -638,9 +649,14 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
g_queue_remove (&self->pending_pads, sctpenc_pad); g_queue_remove (&self->pending_pads, sctpenc_pad);
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
} else if (bytes_sent == length) {
sctpenc_pad->bytes_sent += bytes_sent;
break;
} }
} }
flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK; flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
out:
g_mutex_unlock (&sctpenc_pad->lock); g_mutex_unlock (&sctpenc_pad->lock);
gst_buffer_unmap (buffer, &map); gst_buffer_unmap (buffer, &map);

View file

@ -29,6 +29,7 @@
#include "sctpassociation.h" #include "sctpassociation.h"
#include <gst/gst.h>
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
@ -430,19 +431,20 @@ gst_sctp_association_incoming_packet (GstSctpAssociation * self,
usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0); usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0);
} }
gboolean gint32
gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf, gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf,
guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered, guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered,
GstSctpAssociationPartialReliability pr, guint32 reliability_param) GstSctpAssociationPartialReliability pr, guint32 reliability_param)
{ {
struct sctp_sendv_spa spa; struct sctp_sendv_spa spa;
gint32 bytes_sent; gint32 bytes_sent = -1;
gboolean result = FALSE;
struct sockaddr_conn remote_addr; struct sockaddr_conn remote_addr;
g_rec_mutex_lock (&self->association_mutex); g_rec_mutex_lock (&self->association_mutex);
if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
GST_ERROR_OBJECT (self, "Association not connected yet");
goto end; goto end;
}
memset (&spa, 0, sizeof (spa)); memset (&spa, 0, sizeof (spa));
@ -470,19 +472,19 @@ gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf,
(socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0); (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
if (bytes_sent < 0) { if (bytes_sent < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
bytes_sent = 0;
/* Resending this buffer is taken care of by the gstsctpenc */ /* Resending this buffer is taken care of by the gstsctpenc */
goto end; goto end;
} else { } else {
g_warning ("Error sending data on stream %u: (%u) %s", stream_id, errno, GST_ERROR_OBJECT (self, "Error sending data on stream %u: (%u) %s",
strerror (errno)); stream_id, errno, g_strerror (errno));
goto end; goto end;
} }
} }
result = TRUE;
end: end:
g_rec_mutex_unlock (&self->association_mutex); g_rec_mutex_unlock (&self->association_mutex);
return result; return bytes_sent;
} }