curlsmtpsink: terminate transfer thread properly

If no EOS has been sent, the curl readfunc callback will
return ABORT. The media file in that case will not be properly
finalized.

https://bugzilla.gnome.org/show_bug.cgi?id=700886
This commit is contained in:
Patricia Muscalu 2013-05-23 14:32:07 +02:00 committed by Tim-Philipp Müller
parent a666843005
commit 8303561bd9
4 changed files with 118 additions and 36 deletions

View file

@ -147,6 +147,26 @@ static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
#define parent_class gst_curl_base_sink_parent_class #define parent_class gst_curl_base_sink_parent_class
G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK); G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
static gboolean
gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
{
return sink->transfer_buf->len > 0;
}
static gboolean
gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
{
GstCurlBaseSinkClass *klass;
gboolean res = FALSE;
klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
if (klass->has_buffered_data_unlocked)
res = klass->has_buffered_data_unlocked (sink);
return res;
}
static void static void
gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass) gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
{ {
@ -179,6 +199,8 @@ gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
klass->handle_transfer = handle_transfer; klass->handle_transfer = handle_transfer;
klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb; klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer; klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
klass->has_buffered_data_unlocked =
gst_curl_base_sink_default_has_buffered_data_unlocked;
/* FIXME: check against souphttpsrc and use same names for same properties */ /* FIXME: check against souphttpsrc and use same names for same properties */
g_object_class_install_property (gobject_class, PROP_LOCATION, g_object_class_install_property (gobject_class, PROP_LOCATION,
@ -685,9 +707,19 @@ gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
* then zero will be returned to indicate end of current transfer */ * then zero will be returned to indicate end of current transfer */
GST_OBJECT_LOCK (sink); GST_OBJECT_LOCK (sink);
if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) { if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
sink->transfer_thread_close) {
GST_WARNING_OBJECT (sink,
"discarding render data due to thread close flag");
GST_OBJECT_UNLOCK (sink);
return CURL_READFUNC_ABORT;
}
if (klass->flush_data_unlocked) { if (klass->flush_data_unlocked) {
bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr, bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
max_bytes_to_send, sink->new_file); max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
GST_OBJECT_UNLOCK (sink); GST_OBJECT_UNLOCK (sink);
@ -800,6 +832,13 @@ handle_transfer (GstCurlBaseSink * sink)
} else if (errno == EBUSY) { } else if (errno == EBUSY) {
GST_DEBUG_OBJECT (sink, "poll stopped"); GST_DEBUG_OBJECT (sink, "poll stopped");
retval = GST_FLOW_EOS; retval = GST_FLOW_EOS;
GST_OBJECT_LOCK (sink);
if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
GST_WARNING_OBJECT (sink,
"discarding render data due to thread close flag");
GST_OBJECT_UNLOCK (sink);
goto fail; goto fail;
} else { } else {
GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno)); GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno));

View file

@ -101,7 +101,8 @@ struct _GstCurlBaseSinkClass
size_t (*transfer_data_buffer) (GstCurlBaseSink * sink, void *curl_ptr, size_t (*transfer_data_buffer) (GstCurlBaseSink * sink, void *curl_ptr,
size_t block_size, guint * last_chunk); size_t block_size, guint * last_chunk);
size_t (*flush_data_unlocked) (GstCurlBaseSink * sink, void *curl_ptr, size_t (*flush_data_unlocked) (GstCurlBaseSink * sink, void *curl_ptr,
size_t block_size, gboolean new_file); size_t block_size, gboolean new_file, gboolean close_transfer);
gboolean (*has_buffered_data_unlocked) (GstCurlBaseSink * sink);
}; };
GType gst_curl_base_sink_get_type (void); GType gst_curl_base_sink_get_type (void);

View file

@ -122,7 +122,7 @@ static gboolean gst_curl_smtp_sink_prepare_transfer (GstCurlBaseSink * bcsink);
static size_t gst_curl_smtp_sink_transfer_data_buffer (GstCurlBaseSink * sink, static size_t gst_curl_smtp_sink_transfer_data_buffer (GstCurlBaseSink * sink,
void *curl_ptr, size_t block_size, guint * last_chunk); void *curl_ptr, size_t block_size, guint * last_chunk);
static size_t gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink, static size_t gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
void *curl_ptr, size_t block_size, gboolean new_file); void *curl_ptr, size_t block_size, gboolean new_file, gboolean close_transfer);
/* private functions */ /* private functions */
@ -151,15 +151,48 @@ gst_curl_smtp_sink_wait_for_transfer_end_unlocked (GstCurlSmtpSink * sink)
GST_LOG ("final data sent"); GST_LOG ("final data sent");
} }
static void
add_final_boundary_unlocked (GstCurlSmtpSink * sink)
{
GByteArray *array;
gchar *boundary_end;
gsize len;
gint save, state;
gchar *data_out;
GST_DEBUG ("adding final boundary");
array = sink->base64_chunk->chunk_array;
g_assert (array);
/* it will need up to 5 bytes if line-breaking is enabled
* additional byte is needed for <CR> as it is not automatically added by glib */
data_out = g_malloc (6);
save = sink->base64_chunk->save;
state = sink->base64_chunk->state;
len = g_base64_encode_close (TRUE, data_out, &state, &save);
/* workaround */
data_out[len - 1] = '\r';
data_out[len] = '\n';
/* +1 for CR */
g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1));
g_free (data_out);
boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END);
g_byte_array_append (array, (guint8 *) boundary_end, strlen (boundary_end));
g_free (boundary_end);
sink->final_boundary_added = TRUE;
}
static gboolean static gboolean
gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event) gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
{ {
GstCurlBaseSink *bcsink = GST_CURL_BASE_SINK (bsink); GstCurlBaseSink *bcsink = GST_CURL_BASE_SINK (bsink);
GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bsink); GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bsink);
GByteArray *array;
gchar *boundary_end;
switch (event->type) { switch (event->type) {
case GST_EVENT_EOS: case GST_EVENT_EOS:
GST_DEBUG_OBJECT (sink, "received EOS"); GST_DEBUG_OBJECT (sink, "received EOS");
@ -169,34 +202,8 @@ gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
sink->eos = TRUE; sink->eos = TRUE;
GST_OBJECT_UNLOCK (sink); GST_OBJECT_UNLOCK (sink);
if (sink->base64_chunk != NULL) { if (sink->base64_chunk != NULL)
gsize len; add_final_boundary_unlocked (sink);
gint save, state;
gchar *data_out;
array = sink->base64_chunk->chunk_array;
g_assert (array);
GST_DEBUG ("adding final boundary");
/* it will need up to 5 bytes if line-breaking is enabled
* additional byte is needed for <CR> as it is not automatically added by glib */
data_out = g_malloc (6);
save = sink->base64_chunk->save;
state = sink->base64_chunk->state;
len = g_base64_encode_close (TRUE, data_out, &state, &save);
/* workaround */
data_out[len - 1] = '\r';
data_out[len] = '\n';
/* +1 for CR */
g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1));
g_free (data_out);
boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END);
g_byte_array_append (array, (guint8 *) boundary_end,
strlen (boundary_end));
g_free (boundary_end);
}
gst_curl_base_sink_transfer_thread_notify_unlocked (bcsink); gst_curl_base_sink_transfer_thread_notify_unlocked (bcsink);
@ -217,6 +224,25 @@ gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event)
return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event); return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
} }
static gboolean
gst_curl_smtp_sink_has_buffered_data_unlocked (GstCurlBaseSink * bcsink)
{
GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink);
Base64Chunk *chunk;
GByteArray *array = NULL;
gboolean ret = FALSE;
chunk = sink->base64_chunk;
if (chunk) {
array = chunk->chunk_array;
if (array)
ret = (array->len == 0 && sink->final_boundary_added) ? FALSE : TRUE;
}
return ret;
}
static void static void
gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass) gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass)
{ {
@ -245,6 +271,8 @@ gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass)
gst_curl_smtp_sink_transfer_data_buffer; gst_curl_smtp_sink_transfer_data_buffer;
gstcurlbasesink_class->flush_data_unlocked = gstcurlbasesink_class->flush_data_unlocked =
gst_curl_smtp_sink_flush_data_unlocked; gst_curl_smtp_sink_flush_data_unlocked;
gstcurlbasesink_class->has_buffered_data_unlocked =
gst_curl_smtp_sink_has_buffered_data_unlocked;
gstbasesink_class->event = gst_curl_smtp_sink_event; gstbasesink_class->event = gst_curl_smtp_sink_event;
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_smtp_sink_finalize); gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_smtp_sink_finalize);
@ -306,6 +334,7 @@ gst_curl_smtp_sink_init (GstCurlSmtpSink * sink)
g_cond_init (&sink->cond_transfer_end); g_cond_init (&sink->cond_transfer_end);
sink->transfer_end = FALSE; sink->transfer_end = FALSE;
sink->eos = FALSE; sink->eos = FALSE;
sink->final_boundary_added = FALSE;
sink->reset_transfer_options = FALSE; sink->reset_transfer_options = FALSE;
sink->use_ssl = DEFAULT_USE_SSL; sink->use_ssl = DEFAULT_USE_SSL;
@ -710,7 +739,8 @@ gst_curl_smtp_sink_set_mime_type (GstCurlBaseSink * bcsink, GstCaps * caps)
static size_t static size_t
gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink, gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
void *curl_ptr, size_t block_size, gboolean new_file) void *curl_ptr, size_t block_size, gboolean new_file,
gboolean close_transfer)
{ {
GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink); GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink);
Base64Chunk *chunk = sink->base64_chunk; Base64Chunk *chunk = sink->base64_chunk;
@ -721,10 +751,17 @@ gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
gint len; gint len;
gchar *data_out; gchar *data_out;
GST_DEBUG ("live: %d, num attachments: %d, num attachments_left: %d, eos: %d, "
"close_transfer: %d, final boundary: %d, array_len: %d", bcsink->is_live,
sink->nbr_attachments, sink->nbr_attachments_left, sink->eos, close_transfer,
sink->final_boundary_added, array->len);
if ((bcsink->is_live && (sink->nbr_attachments_left == sink->nbr_attachments)) if ((bcsink->is_live && (sink->nbr_attachments_left == sink->nbr_attachments))
|| (sink->nbr_attachments == 1) || sink->eos) { || (sink->nbr_attachments == 1) || sink->eos || sink->final_boundary_added) {
bcsink->is_live = FALSE; bcsink->is_live = FALSE;
sink->reset_transfer_options = TRUE; sink->reset_transfer_options = TRUE;
sink->final_boundary_added = FALSE;
GST_DEBUG ("returning 0, no more data to send in this transfer"); GST_DEBUG ("returning 0, no more data to send in this transfer");
@ -759,6 +796,10 @@ gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink,
gst_curl_smtp_sink_set_payload_headers_unlocked (bcsink); gst_curl_smtp_sink_set_payload_headers_unlocked (bcsink);
} }
if (close_transfer && !sink->final_boundary_added)
add_final_boundary_unlocked (sink);
bytes_to_send = MIN (block_size, array->len); bytes_to_send = MIN (block_size, array->len);
memcpy ((guint8 *) curl_ptr, array->data, bytes_to_send); memcpy ((guint8 *) curl_ptr, array->data, bytes_to_send);
g_byte_array_remove_range (array, 0, bytes_to_send); g_byte_array_remove_range (array, 0, bytes_to_send);

View file

@ -73,6 +73,7 @@ struct _GstCurlSmtpSink
gint nbr_attachments_left; gint nbr_attachments_left;
gboolean reset_transfer_options; gboolean reset_transfer_options;
gboolean final_boundary_added;
gboolean eos; gboolean eos;
}; };