diff --git a/ext/curl/gstcurlbasesink.c b/ext/curl/gstcurlbasesink.c index deadad2e5f..905a7b5ac1 100644 --- a/ext/curl/gstcurlbasesink.c +++ b/ext/curl/gstcurlbasesink.c @@ -147,6 +147,26 @@ static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf, #define parent_class gst_curl_base_sink_parent_class G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK); +static gboolean +gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink) +{ + return sink->transfer_buf->len > 0; +} + +static gboolean +gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink) +{ + GstCurlBaseSinkClass *klass; + gboolean res = FALSE; + + klass = GST_CURL_BASE_SINK_GET_CLASS (sink); + + if (klass->has_buffered_data_unlocked) + res = klass->has_buffered_data_unlocked (sink); + + return res; +} + static void gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass) { @@ -179,6 +199,8 @@ gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass) klass->handle_transfer = handle_transfer; klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb; klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer; + klass->has_buffered_data_unlocked = + gst_curl_base_sink_default_has_buffered_data_unlocked; /* FIXME: check against souphttpsrc and use same names for same properties */ g_object_class_install_property (gobject_class, PROP_LOCATION, @@ -685,9 +707,19 @@ gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb, * then zero will be returned to indicate end of current transfer */ GST_OBJECT_LOCK (sink); if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) { + + if (gst_curl_base_sink_has_buffered_data_unlocked (sink) && + sink->transfer_thread_close) { + GST_WARNING_OBJECT (sink, + "discarding render data due to thread close flag"); + + GST_OBJECT_UNLOCK (sink); + return CURL_READFUNC_ABORT; + } + if (klass->flush_data_unlocked) { bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr, - max_bytes_to_send, sink->new_file); + max_bytes_to_send, sink->new_file, sink->transfer_thread_close); GST_OBJECT_UNLOCK (sink); @@ -800,6 +832,13 @@ handle_transfer (GstCurlBaseSink * sink) } else if (errno == EBUSY) { GST_DEBUG_OBJECT (sink, "poll stopped"); retval = GST_FLOW_EOS; + + GST_OBJECT_LOCK (sink); + if (gst_curl_base_sink_has_buffered_data_unlocked (sink)) + GST_WARNING_OBJECT (sink, + "discarding render data due to thread close flag"); + GST_OBJECT_UNLOCK (sink); + goto fail; } else { GST_DEBUG_OBJECT (sink, "poll failed: %s", g_strerror (errno)); diff --git a/ext/curl/gstcurlbasesink.h b/ext/curl/gstcurlbasesink.h index 4096c5c0e4..3def1ba795 100644 --- a/ext/curl/gstcurlbasesink.h +++ b/ext/curl/gstcurlbasesink.h @@ -101,7 +101,8 @@ struct _GstCurlBaseSinkClass size_t (*transfer_data_buffer) (GstCurlBaseSink * sink, void *curl_ptr, size_t block_size, guint * last_chunk); size_t (*flush_data_unlocked) (GstCurlBaseSink * sink, void *curl_ptr, - size_t block_size, gboolean new_file); + size_t block_size, gboolean new_file, gboolean close_transfer); + gboolean (*has_buffered_data_unlocked) (GstCurlBaseSink * sink); }; GType gst_curl_base_sink_get_type (void); diff --git a/ext/curl/gstcurlsmtpsink.c b/ext/curl/gstcurlsmtpsink.c index f796c144aa..2b83faac85 100644 --- a/ext/curl/gstcurlsmtpsink.c +++ b/ext/curl/gstcurlsmtpsink.c @@ -122,7 +122,7 @@ static gboolean gst_curl_smtp_sink_prepare_transfer (GstCurlBaseSink * bcsink); static size_t gst_curl_smtp_sink_transfer_data_buffer (GstCurlBaseSink * sink, void *curl_ptr, size_t block_size, guint * last_chunk); static size_t gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink, - void *curl_ptr, size_t block_size, gboolean new_file); + void *curl_ptr, size_t block_size, gboolean new_file, gboolean close_transfer); /* private functions */ @@ -151,15 +151,48 @@ gst_curl_smtp_sink_wait_for_transfer_end_unlocked (GstCurlSmtpSink * sink) GST_LOG ("final data sent"); } +static void +add_final_boundary_unlocked (GstCurlSmtpSink * sink) +{ + GByteArray *array; + gchar *boundary_end; + gsize len; + gint save, state; + gchar *data_out; + + GST_DEBUG ("adding final boundary"); + + array = sink->base64_chunk->chunk_array; + g_assert (array); + + /* it will need up to 5 bytes if line-breaking is enabled + * additional byte is needed for as it is not automatically added by glib */ + data_out = g_malloc (6); + save = sink->base64_chunk->save; + state = sink->base64_chunk->state; + len = g_base64_encode_close (TRUE, data_out, &state, &save); + + /* workaround */ + data_out[len - 1] = '\r'; + data_out[len] = '\n'; + + /* +1 for CR */ + g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1)); + g_free (data_out); + + boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END); + g_byte_array_append (array, (guint8 *) boundary_end, strlen (boundary_end)); + g_free (boundary_end); + + sink->final_boundary_added = TRUE; +} + static gboolean gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event) { GstCurlBaseSink *bcsink = GST_CURL_BASE_SINK (bsink); GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bsink); - GByteArray *array; - gchar *boundary_end; - switch (event->type) { case GST_EVENT_EOS: GST_DEBUG_OBJECT (sink, "received EOS"); @@ -169,34 +202,8 @@ gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event) sink->eos = TRUE; GST_OBJECT_UNLOCK (sink); - if (sink->base64_chunk != NULL) { - gsize len; - gint save, state; - gchar *data_out; - - array = sink->base64_chunk->chunk_array; - g_assert (array); - - GST_DEBUG ("adding final boundary"); - - /* it will need up to 5 bytes if line-breaking is enabled - * additional byte is needed for as it is not automatically added by glib */ - data_out = g_malloc (6); - save = sink->base64_chunk->save; - state = sink->base64_chunk->state; - len = g_base64_encode_close (TRUE, data_out, &state, &save); - /* workaround */ - data_out[len - 1] = '\r'; - data_out[len] = '\n'; - /* +1 for CR */ - g_byte_array_append (array, (guint8 *) data_out, (guint) (len + 1)); - g_free (data_out); - - boundary_end = g_strdup_printf ("\r\n%s\r\n", BOUNDARY_STRING_END); - g_byte_array_append (array, (guint8 *) boundary_end, - strlen (boundary_end)); - g_free (boundary_end); - } + if (sink->base64_chunk != NULL) + add_final_boundary_unlocked (sink); gst_curl_base_sink_transfer_thread_notify_unlocked (bcsink); @@ -217,6 +224,25 @@ gst_curl_smtp_sink_event (GstBaseSink * bsink, GstEvent * event) return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event); } +static gboolean +gst_curl_smtp_sink_has_buffered_data_unlocked (GstCurlBaseSink * bcsink) +{ + GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink); + Base64Chunk *chunk; + GByteArray *array = NULL; + gboolean ret = FALSE; + + chunk = sink->base64_chunk; + + if (chunk) { + array = chunk->chunk_array; + if (array) + ret = (array->len == 0 && sink->final_boundary_added) ? FALSE : TRUE; + } + + return ret; +} + static void gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass) { @@ -245,6 +271,8 @@ gst_curl_smtp_sink_class_init (GstCurlSmtpSinkClass * klass) gst_curl_smtp_sink_transfer_data_buffer; gstcurlbasesink_class->flush_data_unlocked = gst_curl_smtp_sink_flush_data_unlocked; + gstcurlbasesink_class->has_buffered_data_unlocked = + gst_curl_smtp_sink_has_buffered_data_unlocked; gstbasesink_class->event = gst_curl_smtp_sink_event; gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_smtp_sink_finalize); @@ -306,6 +334,7 @@ gst_curl_smtp_sink_init (GstCurlSmtpSink * sink) g_cond_init (&sink->cond_transfer_end); sink->transfer_end = FALSE; sink->eos = FALSE; + sink->final_boundary_added = FALSE; sink->reset_transfer_options = FALSE; sink->use_ssl = DEFAULT_USE_SSL; @@ -710,7 +739,8 @@ gst_curl_smtp_sink_set_mime_type (GstCurlBaseSink * bcsink, GstCaps * caps) static size_t gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink, - void *curl_ptr, size_t block_size, gboolean new_file) + void *curl_ptr, size_t block_size, gboolean new_file, + gboolean close_transfer) { GstCurlSmtpSink *sink = GST_CURL_SMTP_SINK (bcsink); Base64Chunk *chunk = sink->base64_chunk; @@ -721,10 +751,17 @@ gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink, gint len; gchar *data_out; + GST_DEBUG ("live: %d, num attachments: %d, num attachments_left: %d, eos: %d, " + "close_transfer: %d, final boundary: %d, array_len: %d", bcsink->is_live, + sink->nbr_attachments, sink->nbr_attachments_left, sink->eos, close_transfer, + sink->final_boundary_added, array->len); + + if ((bcsink->is_live && (sink->nbr_attachments_left == sink->nbr_attachments)) - || (sink->nbr_attachments == 1) || sink->eos) { + || (sink->nbr_attachments == 1) || sink->eos || sink->final_boundary_added) { bcsink->is_live = FALSE; sink->reset_transfer_options = TRUE; + sink->final_boundary_added = FALSE; GST_DEBUG ("returning 0, no more data to send in this transfer"); @@ -759,6 +796,10 @@ gst_curl_smtp_sink_flush_data_unlocked (GstCurlBaseSink * bcsink, gst_curl_smtp_sink_set_payload_headers_unlocked (bcsink); } + + if (close_transfer && !sink->final_boundary_added) + add_final_boundary_unlocked (sink); + bytes_to_send = MIN (block_size, array->len); memcpy ((guint8 *) curl_ptr, array->data, bytes_to_send); g_byte_array_remove_range (array, 0, bytes_to_send); diff --git a/ext/curl/gstcurlsmtpsink.h b/ext/curl/gstcurlsmtpsink.h index de57de7d14..94c33498fd 100644 --- a/ext/curl/gstcurlsmtpsink.h +++ b/ext/curl/gstcurlsmtpsink.h @@ -73,6 +73,7 @@ struct _GstCurlSmtpSink gint nbr_attachments_left; gboolean reset_transfer_options; + gboolean final_boundary_added; gboolean eos; };