summaryrefslogtreecommitdiff
path: root/ext/dtls
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2015-03-19 13:30:00 +0100
committerSebastian Dröge <sebastian@centricular.com>2015-03-19 13:30:00 +0100
commitfd609f6bc06d89ca28f1b2d8faebf71d981624b5 (patch)
tree434fe873a2c5640890cfe49dc3f146d46a0b6faa /ext/dtls
parent4072666c7dac6647588a29c0b0316e1682fe7b08 (diff)
downloadgstreamer-plugins-bad-fd609f6bc06d89ca28f1b2d8faebf71d981624b5.tar.gz
dtls: Use a shared thread pool for the timeouts
This way we will share threads with other DTLS connections if possible, and don't have to start/stop threads for timeouts if there are many to be handled in a short period of time. Also use the system clock and async waiting on it for scheduling the timeouts.
Diffstat (limited to 'ext/dtls')
-rw-r--r--ext/dtls/gstdtlsconnection.c181
-rw-r--r--ext/dtls/gstdtlsconnection.h2
-rw-r--r--ext/dtls/gstdtlsenc.c8
3 files changed, 100 insertions, 91 deletions
diff --git a/ext/dtls/gstdtlsconnection.c b/ext/dtls/gstdtlsconnection.c
index bdd9a1719..8734c4222 100644
--- a/ext/dtls/gstdtlsconnection.c
+++ b/ext/dtls/gstdtlsconnection.c
@@ -74,6 +74,9 @@ static GParamSpec *properties[NUM_PROPERTIES];
static int connection_ex_index;
+static GstClock *system_clock;
+static void handle_timeout (gpointer data, gpointer user_data);
+
struct _GstDtlsConnectionPrivate
{
SSL *ssl;
@@ -83,7 +86,6 @@ struct _GstDtlsConnectionPrivate
gboolean is_client;
gboolean is_alive;
gboolean keys_exported;
- gboolean timeout_set;
GMutex mutex;
GCond condition;
@@ -92,6 +94,9 @@ struct _GstDtlsConnectionPrivate
gint bio_buffer_offset;
GClosure *send_closure;
+
+ gboolean timeout_pending;
+ GThreadPool *thread_pool;
};
static void gst_dtls_connection_finalize (GObject * gobject);
@@ -99,7 +104,6 @@ static void gst_dtls_connection_set_property (GObject *, guint prop_id,
const GValue *, GParamSpec *);
static void log_state (GstDtlsConnection *, const gchar * str);
-static gpointer connection_timeout_thread_func (GstDtlsConnection *);
static void export_srtp_keys (GstDtlsConnection *);
static void openssl_poll (GstDtlsConnection *);
static int openssl_verify_callback (int preverify_ok,
@@ -154,6 +158,8 @@ gst_dtls_connection_class_init (GstDtlsConnectionClass * klass)
_gst_dtls_init_openssl ();
gobject_class->finalize = gst_dtls_connection_finalize;
+
+ system_clock = gst_system_clock_obtain ();
}
static void
@@ -171,7 +177,6 @@ gst_dtls_connection_init (GstDtlsConnection * self)
priv->is_client = FALSE;
priv->is_alive = TRUE;
priv->keys_exported = FALSE;
- priv->timeout_set = FALSE;
priv->bio_buffer = NULL;
priv->bio_buffer_len = 0;
@@ -179,6 +184,13 @@ gst_dtls_connection_init (GstDtlsConnection * self)
g_mutex_init (&priv->mutex);
g_cond_init (&priv->condition);
+
+ /* Thread pool for handling timeouts, we only need one thread for that
+ * really and share threads with all other thread pools around there as
+ * this is not going to happen very often */
+ priv->thread_pool = g_thread_pool_new (handle_timeout, NULL, 1, FALSE, NULL);
+ g_assert (priv->thread_pool);
+ priv->timeout_pending = FALSE;
}
static void
@@ -187,6 +199,8 @@ gst_dtls_connection_finalize (GObject * gobject)
GstDtlsConnection *self = GST_DTLS_CONNECTION (gobject);
GstDtlsConnectionPrivate *priv = self->priv;
+ g_thread_pool_free (priv->thread_pool, TRUE, TRUE);
+ priv->thread_pool = NULL;
SSL_free (priv->ssl);
priv->ssl = NULL;
@@ -258,7 +272,6 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
GST_TRACE_OBJECT (self, "locked @ start");
priv->is_alive = TRUE;
- priv->timeout_set = FALSE;
priv->bio_buffer = NULL;
priv->bio_buffer_len = 0;
priv->bio_buffer_offset = 0;
@@ -281,34 +294,94 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client)
}
static void
-gst_dtls_connection_start_timeout_locked (GstDtlsConnection * self)
+handle_timeout (gpointer data, gpointer user_data)
+{
+ GstDtlsConnection *self = data;
+ GstDtlsConnectionPrivate *priv;
+ gint ret;
+
+ priv = self->priv;
+
+ g_mutex_lock (&priv->mutex);
+ priv->timeout_pending = FALSE;
+ if (priv->is_alive) {
+ ret = DTLSv1_handle_timeout (priv->ssl);
+
+ GST_DEBUG_OBJECT (self, "handle timeout returned %d, is_alive: %d", ret,
+ priv->is_alive);
+
+ if (ret < 0) {
+ GST_WARNING_OBJECT (self, "handling timeout failed");
+ } else if (ret > 0) {
+ log_state (self, "handling timeout before poll");
+ openssl_poll (self);
+ log_state (self, "handling timeout after poll");
+ }
+ }
+ g_mutex_unlock (&priv->mutex);
+ g_object_unref (self);
+}
+
+static gboolean
+schedule_timeout_handling (GstClock * clock, GstClockTime time, GstClockID id,
+ gpointer user_data)
+{
+ GstDtlsConnection *self = user_data;
+
+ g_mutex_lock (&self->priv->mutex);
+ if (self->priv->is_alive && !self->priv->timeout_pending) {
+ self->priv->timeout_pending = TRUE;
+
+ GST_TRACE_OBJECT (self, "Schedule timeout now");
+ g_thread_pool_push (self->priv->thread_pool, g_object_ref (self), NULL);
+ }
+ g_mutex_unlock (&self->priv->mutex);
+
+ return TRUE;
+}
+
+static void
+gst_dtls_connection_check_timeout_locked (GstDtlsConnection * self)
{
GstDtlsConnectionPrivate *priv;
- GError *error = NULL;
- gchar *thread_name;
+ struct timeval timeout;
+ gint64 end_time, wait_time;
g_return_if_fail (GST_IS_DTLS_CONNECTION (self));
priv = self->priv;
- if (priv->thread)
- return;
- thread_name = g_strdup_printf ("connection_thread_%p", self);
+ if (DTLSv1_get_timeout (priv->ssl, &timeout)) {
+ wait_time = timeout.tv_sec * G_USEC_PER_SEC + timeout.tv_usec;
- GST_INFO_OBJECT (self, "starting connection timeout");
- priv->thread = g_thread_try_new (thread_name,
- (GThreadFunc) connection_timeout_thread_func, self, &error);
- if (error) {
- GST_WARNING_OBJECT (self, "error creating connection thread: %s (%d)",
- error->message, error->code);
- g_clear_error (&error);
- }
+ GST_DEBUG_OBJECT (self, "waiting for %" G_GINT64_FORMAT " usec", wait_time);
+ if (wait_time) {
+ GstClockID clock_id;
+ GstClockReturn clock_return;
- g_free (thread_name);
+ end_time = gst_clock_get_time (system_clock) + wait_time * GST_USECOND;
+
+ clock_id = gst_clock_new_single_shot_id (system_clock, end_time);
+ clock_return =
+ gst_clock_id_wait_async (clock_id, schedule_timeout_handling,
+ g_object_ref (self), (GDestroyNotify) g_object_unref);
+ g_assert (clock_return == GST_CLOCK_OK);
+ gst_clock_id_unref (clock_id);
+ } else {
+ if (self->priv->is_alive && !self->priv->timeout_pending) {
+ self->priv->timeout_pending = TRUE;
+ GST_TRACE_OBJECT (self, "Schedule timeout now");
+
+ g_thread_pool_push (self->priv->thread_pool, g_object_ref (self), NULL);
+ }
+ }
+ } else {
+ GST_DEBUG_OBJECT (self, "no timeout set");
+ }
}
void
-gst_dtls_connection_start_timeout (GstDtlsConnection * self)
+gst_dtls_connection_check_timeout (GstDtlsConnection * self)
{
GstDtlsConnectionPrivate *priv;
@@ -319,7 +392,7 @@ gst_dtls_connection_start_timeout (GstDtlsConnection * self)
GST_TRACE_OBJECT (self, "locking @ start_timeout");
g_mutex_lock (&priv->mutex);
GST_TRACE_OBJECT (self, "locked @ start_timeout");
- gst_dtls_connection_start_timeout_locked (self);
+ gst_dtls_connection_check_timeout_locked (self);
g_mutex_unlock (&priv->mutex);
GST_TRACE_OBJECT (self, "unlocking @ start_timeout");
}
@@ -506,66 +579,6 @@ log_state (GstDtlsConnection * self, const gchar * str)
states, SSL_get_state (priv->ssl), SSL_state_string_long (priv->ssl));
}
-static gpointer
-connection_timeout_thread_func (GstDtlsConnection * self)
-{
- GstDtlsConnectionPrivate *priv = self->priv;
- struct timeval timeout;
- gint64 end_time, wait_time;
- gint ret;
-
- while (priv->is_alive) {
- GST_TRACE_OBJECT (self, "locking @ timeout");
- g_mutex_lock (&priv->mutex);
- GST_TRACE_OBJECT (self, "locked @ timeout");
-
- if (DTLSv1_get_timeout (priv->ssl, &timeout)) {
- wait_time = timeout.tv_sec * G_USEC_PER_SEC + timeout.tv_usec;
-
- if (wait_time) {
- GST_DEBUG_OBJECT (self, "waiting for %" G_GINT64_FORMAT " usec",
- wait_time);
-
- end_time = g_get_monotonic_time () + wait_time;
-
- GST_TRACE_OBJECT (self, "wait @ timeout");
- g_cond_wait_until (&priv->condition, &priv->mutex, end_time);
- GST_TRACE_OBJECT (self, "continued @ timeout");
- }
-
- ret = DTLSv1_handle_timeout (priv->ssl);
-
- GST_DEBUG_OBJECT (self, "handle timeout returned %d, is_alive: %d", ret,
- priv->is_alive);
-
- if (ret < 0) {
- GST_TRACE_OBJECT (self, "unlocking @ timeout failed");
- g_mutex_unlock (&priv->mutex);
- break; /* self failed after DTLS1_TMO_ALERT_COUNT (12) attempts */
- }
-
- if (ret > 0) {
- log_state (self, "handling timeout before poll");
- openssl_poll (self);
- log_state (self, "handling timeout after poll");
- }
- } else {
- GST_DEBUG_OBJECT (self, "no timeout set, stopping thread");
- priv->timeout_set = FALSE;
- priv->thread = NULL;
- g_mutex_unlock (&priv->mutex);
- break;
- }
-
- GST_TRACE_OBJECT (self, "unlocking @ timeout");
- g_mutex_unlock (&priv->mutex);
- }
-
- log_state (self, "timeout thread exiting");
-
- return NULL;
-}
-
static void
export_srtp_keys (GstDtlsConnection * self)
{
@@ -858,11 +871,7 @@ bio_method_ctrl (BIO * bio, int cmd, long arg1, void *arg2)
case BIO_CTRL_DGRAM_SET_NEXT_TIMEOUT:
case BIO_CTRL_DGRAM_SET_RECV_TIMEOUT:
GST_LOG_OBJECT (self, "BIO: Timeout set");
- priv->timeout_set = TRUE;
- if (priv->thread)
- g_cond_signal (&priv->condition);
- else
- gst_dtls_connection_start_timeout_locked (self);
+ gst_dtls_connection_check_timeout_locked (self);
return 1;
case BIO_CTRL_RESET:
priv->bio_buffer = NULL;
diff --git a/ext/dtls/gstdtlsconnection.h b/ext/dtls/gstdtlsconnection.h
index 0051dd45e..6260b935b 100644
--- a/ext/dtls/gstdtlsconnection.h
+++ b/ext/dtls/gstdtlsconnection.h
@@ -85,7 +85,7 @@ struct _GstDtlsConnectionClass {
GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
-void gst_dtls_connection_start_timeout(GstDtlsConnection *);
+void gst_dtls_connection_check_timeout(GstDtlsConnection *);
/*
* Stops the connections, it is not required to call this function.
diff --git a/ext/dtls/gstdtlsenc.c b/ext/dtls/gstdtlsenc.c
index 2f8d48382..477af1077 100644
--- a/ext/dtls/gstdtlsenc.c
+++ b/ext/dtls/gstdtlsenc.c
@@ -400,7 +400,7 @@ src_task_loop (GstPad * pad)
GstDtlsEnc *self = GST_DTLS_ENC (GST_PAD_PARENT (pad));
GstFlowReturn ret;
GstBuffer *buffer;
- gboolean start_connection_timeout = FALSE;
+ gboolean check_connection_timeout = FALSE;
GST_TRACE_OBJECT (self, "src loop: acquiring lock");
g_mutex_lock (&self->queue_lock);
@@ -444,14 +444,14 @@ src_task_loop (GstPad * pad)
gst_caps_unref (caps);
gst_segment_init (&segment, GST_FORMAT_BYTES);
gst_pad_push_event (self->src, gst_event_new_segment (&segment));
- start_connection_timeout = TRUE;
+ check_connection_timeout = TRUE;
}
GST_TRACE_OBJECT (self, "src loop: releasing lock");
ret = gst_pad_push (self->src, buffer);
- if (start_connection_timeout)
- gst_dtls_connection_start_timeout (self->connection);
+ if (check_connection_timeout)
+ gst_dtls_connection_check_timeout (self->connection);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",