dtls: Use a shared thread pool for the timeouts

This way we will share threads with other DTLS connections if possible, and
don't have to start/stop threads for timeouts if there are many to be handled
in a short period of time.

Also use the system clock and async waiting on it for scheduling the timeouts.
This commit is contained in:
Sebastian Dröge 2015-03-19 13:30:00 +01:00
parent 4072666c7d
commit fd609f6bc0
3 changed files with 100 additions and 91 deletions

View file

@ -74,6 +74,9 @@ static GParamSpec *properties[NUM_PROPERTIES];
static int connection_ex_index; static int connection_ex_index;
static GstClock *system_clock;
static void handle_timeout (gpointer data, gpointer user_data);
struct _GstDtlsConnectionPrivate struct _GstDtlsConnectionPrivate
{ {
SSL *ssl; SSL *ssl;
@ -83,7 +86,6 @@ struct _GstDtlsConnectionPrivate
gboolean is_client; gboolean is_client;
gboolean is_alive; gboolean is_alive;
gboolean keys_exported; gboolean keys_exported;
gboolean timeout_set;
GMutex mutex; GMutex mutex;
GCond condition; GCond condition;
@ -92,6 +94,9 @@ struct _GstDtlsConnectionPrivate
gint bio_buffer_offset; gint bio_buffer_offset;
GClosure *send_closure; GClosure *send_closure;
gboolean timeout_pending;
GThreadPool *thread_pool;
}; };
static void gst_dtls_connection_finalize (GObject * gobject); static void gst_dtls_connection_finalize (GObject * gobject);
@ -99,7 +104,6 @@ static void gst_dtls_connection_set_property (GObject *, guint prop_id,
const GValue *, GParamSpec *); const GValue *, GParamSpec *);
static void log_state (GstDtlsConnection *, const gchar * str); static void log_state (GstDtlsConnection *, const gchar * str);
static gpointer connection_timeout_thread_func (GstDtlsConnection *);
static void export_srtp_keys (GstDtlsConnection *); static void export_srtp_keys (GstDtlsConnection *);
static void openssl_poll (GstDtlsConnection *); static void openssl_poll (GstDtlsConnection *);
static int openssl_verify_callback (int preverify_ok, static int openssl_verify_callback (int preverify_ok,
@ -154,6 +158,8 @@ gst_dtls_connection_class_init (GstDtlsConnectionClass * klass)
_gst_dtls_init_openssl (); _gst_dtls_init_openssl ();
gobject_class->finalize = gst_dtls_connection_finalize; gobject_class->finalize = gst_dtls_connection_finalize;
system_clock = gst_system_clock_obtain ();
} }
static void static void
@ -171,7 +177,6 @@ gst_dtls_connection_init (GstDtlsConnection * self)
priv->is_client = FALSE; priv->is_client = FALSE;
priv->is_alive = TRUE; priv->is_alive = TRUE;
priv->keys_exported = FALSE; priv->keys_exported = FALSE;
priv->timeout_set = FALSE;
priv->bio_buffer = NULL; priv->bio_buffer = NULL;
priv->bio_buffer_len = 0; priv->bio_buffer_len = 0;
@ -179,6 +184,13 @@ gst_dtls_connection_init (GstDtlsConnection * self)
g_mutex_init (&priv->mutex); g_mutex_init (&priv->mutex);
g_cond_init (&priv->condition); g_cond_init (&priv->condition);
/* Thread pool for handling timeouts, we only need one thread for that
* really and share threads with all other thread pools around there as
* this is not going to happen very often */
priv->thread_pool = g_thread_pool_new (handle_timeout, NULL, 1, FALSE, NULL);
g_assert (priv->thread_pool);
priv->timeout_pending = FALSE;
} }
static void static void
@ -187,6 +199,8 @@ gst_dtls_connection_finalize (GObject * gobject)
GstDtlsConnection *self = GST_DTLS_CONNECTION (gobject); GstDtlsConnection *self = GST_DTLS_CONNECTION (gobject);
GstDtlsConnectionPrivate *priv = self->priv; GstDtlsConnectionPrivate *priv = self->priv;
g_thread_pool_free (priv->thread_pool, TRUE, TRUE);
priv->thread_pool = NULL;
SSL_free (priv->ssl); SSL_free (priv->ssl);
priv->ssl = NULL; priv->ssl = NULL;
@ -258,7 +272,6 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
GST_TRACE_OBJECT (self, "locked @ start"); GST_TRACE_OBJECT (self, "locked @ start");
priv->is_alive = TRUE; priv->is_alive = TRUE;
priv->timeout_set = FALSE;
priv->bio_buffer = NULL; priv->bio_buffer = NULL;
priv->bio_buffer_len = 0; priv->bio_buffer_len = 0;
priv->bio_buffer_offset = 0; priv->bio_buffer_offset = 0;
@ -281,34 +294,94 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
} }
static void static void
gst_dtls_connection_start_timeout_locked (GstDtlsConnection * self) handle_timeout (gpointer data, gpointer user_data)
{
GstDtlsConnection *self = data;
GstDtlsConnectionPrivate *priv;
gint ret;
priv = self->priv;
g_mutex_lock (&priv->mutex);
priv->timeout_pending = FALSE;
if (priv->is_alive) {
ret = DTLSv1_handle_timeout (priv->ssl);
GST_DEBUG_OBJECT (self, "handle timeout returned %d, is_alive: %d", ret,
priv->is_alive);
if (ret < 0) {
GST_WARNING_OBJECT (self, "handling timeout failed");
} else if (ret > 0) {
log_state (self, "handling timeout before poll");
openssl_poll (self);
log_state (self, "handling timeout after poll");
}
}
g_mutex_unlock (&priv->mutex);
g_object_unref (self);
}
static gboolean
schedule_timeout_handling (GstClock * clock, GstClockTime time, GstClockID id,
gpointer user_data)
{
GstDtlsConnection *self = user_data;
g_mutex_lock (&self->priv->mutex);
if (self->priv->is_alive && !self->priv->timeout_pending) {
self->priv->timeout_pending = TRUE;
GST_TRACE_OBJECT (self, "Schedule timeout now");
g_thread_pool_push (self->priv->thread_pool, g_object_ref (self), NULL);
}
g_mutex_unlock (&self->priv->mutex);
return TRUE;
}
static void
gst_dtls_connection_check_timeout_locked (GstDtlsConnection * self)
{ {
GstDtlsConnectionPrivate *priv; GstDtlsConnectionPrivate *priv;
GError *error = NULL; struct timeval timeout;
gchar *thread_name; gint64 end_time, wait_time;
g_return_if_fail (GST_IS_DTLS_CONNECTION (self)); g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
priv = self->priv; priv = self->priv;
if (priv->thread)
return;
thread_name = g_strdup_printf ("connection_thread_%p", self); if (DTLSv1_get_timeout (priv->ssl, &timeout)) {
wait_time = timeout.tv_sec * G_USEC_PER_SEC + timeout.tv_usec;
GST_INFO_OBJECT (self, "starting connection timeout"); GST_DEBUG_OBJECT (self, "waiting for %" G_GINT64_FORMAT " usec", wait_time);
priv->thread = g_thread_try_new (thread_name, if (wait_time) {
(GThreadFunc) connection_timeout_thread_func, self, &error); GstClockID clock_id;
if (error) { GstClockReturn clock_return;
GST_WARNING_OBJECT (self, "error creating connection thread: %s (%d)",
error->message, error->code); end_time = gst_clock_get_time (system_clock) + wait_time * GST_USECOND;
g_clear_error (&error);
clock_id = gst_clock_new_single_shot_id (system_clock, end_time);
clock_return =
gst_clock_id_wait_async (clock_id, schedule_timeout_handling,
g_object_ref (self), (GDestroyNotify) g_object_unref);
g_assert (clock_return == GST_CLOCK_OK);
gst_clock_id_unref (clock_id);
} else {
if (self->priv->is_alive && !self->priv->timeout_pending) {
self->priv->timeout_pending = TRUE;
GST_TRACE_OBJECT (self, "Schedule timeout now");
g_thread_pool_push (self->priv->thread_pool, g_object_ref (self), NULL);
}
}
} else {
GST_DEBUG_OBJECT (self, "no timeout set");
} }
g_free (thread_name);
} }
void void
gst_dtls_connection_start_timeout (GstDtlsConnection * self) gst_dtls_connection_check_timeout (GstDtlsConnection * self)
{ {
GstDtlsConnectionPrivate *priv; GstDtlsConnectionPrivate *priv;
@ -319,7 +392,7 @@ gst_dtls_connection_start_timeout (GstDtlsConnection * self)
GST_TRACE_OBJECT (self, "locking @ start_timeout"); GST_TRACE_OBJECT (self, "locking @ start_timeout");
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
GST_TRACE_OBJECT (self, "locked @ start_timeout"); GST_TRACE_OBJECT (self, "locked @ start_timeout");
gst_dtls_connection_start_timeout_locked (self); gst_dtls_connection_check_timeout_locked (self);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
GST_TRACE_OBJECT (self, "unlocking @ start_timeout"); GST_TRACE_OBJECT (self, "unlocking @ start_timeout");
} }
@ -506,66 +579,6 @@ log_state (GstDtlsConnection * self, const gchar * str)
states, SSL_get_state (priv->ssl), SSL_state_string_long (priv->ssl)); states, SSL_get_state (priv->ssl), SSL_state_string_long (priv->ssl));
} }
static gpointer
connection_timeout_thread_func (GstDtlsConnection * self)
{
GstDtlsConnectionPrivate *priv = self->priv;
struct timeval timeout;
gint64 end_time, wait_time;
gint ret;
while (priv->is_alive) {
GST_TRACE_OBJECT (self, "locking @ timeout");
g_mutex_lock (&priv->mutex);
GST_TRACE_OBJECT (self, "locked @ timeout");
if (DTLSv1_get_timeout (priv->ssl, &timeout)) {
wait_time = timeout.tv_sec * G_USEC_PER_SEC + timeout.tv_usec;
if (wait_time) {
GST_DEBUG_OBJECT (self, "waiting for %" G_GINT64_FORMAT " usec",
wait_time);
end_time = g_get_monotonic_time () + wait_time;
GST_TRACE_OBJECT (self, "wait @ timeout");
g_cond_wait_until (&priv->condition, &priv->mutex, end_time);
GST_TRACE_OBJECT (self, "continued @ timeout");
}
ret = DTLSv1_handle_timeout (priv->ssl);
GST_DEBUG_OBJECT (self, "handle timeout returned %d, is_alive: %d", ret,
priv->is_alive);
if (ret < 0) {
GST_TRACE_OBJECT (self, "unlocking @ timeout failed");
g_mutex_unlock (&priv->mutex);
break; /* self failed after DTLS1_TMO_ALERT_COUNT (12) attempts */
}
if (ret > 0) {
log_state (self, "handling timeout before poll");
openssl_poll (self);
log_state (self, "handling timeout after poll");
}
} else {
GST_DEBUG_OBJECT (self, "no timeout set, stopping thread");
priv->timeout_set = FALSE;
priv->thread = NULL;
g_mutex_unlock (&priv->mutex);
break;
}
GST_TRACE_OBJECT (self, "unlocking @ timeout");
g_mutex_unlock (&priv->mutex);
}
log_state (self, "timeout thread exiting");
return NULL;
}
static void static void
export_srtp_keys (GstDtlsConnection * self) export_srtp_keys (GstDtlsConnection * self)
{ {
@ -858,11 +871,7 @@ bio_method_ctrl (BIO * bio, int cmd, long arg1, void *arg2)
case BIO_CTRL_DGRAM_SET_NEXT_TIMEOUT: case BIO_CTRL_DGRAM_SET_NEXT_TIMEOUT:
case BIO_CTRL_DGRAM_SET_RECV_TIMEOUT: case BIO_CTRL_DGRAM_SET_RECV_TIMEOUT:
GST_LOG_OBJECT (self, "BIO: Timeout set"); GST_LOG_OBJECT (self, "BIO: Timeout set");
priv->timeout_set = TRUE; gst_dtls_connection_check_timeout_locked (self);
if (priv->thread)
g_cond_signal (&priv->condition);
else
gst_dtls_connection_start_timeout_locked (self);
return 1; return 1;
case BIO_CTRL_RESET: case BIO_CTRL_RESET:
priv->bio_buffer = NULL; priv->bio_buffer = NULL;

View file

@ -85,7 +85,7 @@ struct _GstDtlsConnectionClass {
GType gst_dtls_connection_get_type(void) G_GNUC_CONST; GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client); void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
void gst_dtls_connection_start_timeout(GstDtlsConnection *); void gst_dtls_connection_check_timeout(GstDtlsConnection *);
/* /*
* Stops the connections, it is not required to call this function. * Stops the connections, it is not required to call this function.

View file

@ -400,7 +400,7 @@ src_task_loop (GstPad * pad)
GstDtlsEnc *self = GST_DTLS_ENC (GST_PAD_PARENT (pad)); GstDtlsEnc *self = GST_DTLS_ENC (GST_PAD_PARENT (pad));
GstFlowReturn ret; GstFlowReturn ret;
GstBuffer *buffer; GstBuffer *buffer;
gboolean start_connection_timeout = FALSE; gboolean check_connection_timeout = FALSE;
GST_TRACE_OBJECT (self, "src loop: acquiring lock"); GST_TRACE_OBJECT (self, "src loop: acquiring lock");
g_mutex_lock (&self->queue_lock); g_mutex_lock (&self->queue_lock);
@ -444,14 +444,14 @@ src_task_loop (GstPad * pad)
gst_caps_unref (caps); gst_caps_unref (caps);
gst_segment_init (&segment, GST_FORMAT_BYTES); gst_segment_init (&segment, GST_FORMAT_BYTES);
gst_pad_push_event (self->src, gst_event_new_segment (&segment)); gst_pad_push_event (self->src, gst_event_new_segment (&segment));
start_connection_timeout = TRUE; check_connection_timeout = TRUE;
} }
GST_TRACE_OBJECT (self, "src loop: releasing lock"); GST_TRACE_OBJECT (self, "src loop: releasing lock");
ret = gst_pad_push (self->src, buffer); ret = gst_pad_push (self->src, buffer);
if (start_connection_timeout) if (check_connection_timeout)
gst_dtls_connection_start_timeout (self->connection); gst_dtls_connection_check_timeout (self->connection);
if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s", GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",