diff options
author | Sebastian Dröge <sebastian@centricular.com> | 2015-03-19 13:30:00 +0100 |
---|---|---|
committer | Sebastian Dröge <sebastian@centricular.com> | 2015-03-19 13:30:00 +0100 |
commit | fd609f6bc06d89ca28f1b2d8faebf71d981624b5 (patch) | |
tree | 434fe873a2c5640890cfe49dc3f146d46a0b6faa /ext/dtls | |
parent | 4072666c7dac6647588a29c0b0316e1682fe7b08 (diff) | |
download | gstreamer-plugins-bad-fd609f6bc06d89ca28f1b2d8faebf71d981624b5.tar.gz |
dtls: Use a shared thread pool for the timeouts
This way we will share threads with other DTLS connections if possible, and
don't have to start/stop threads for timeouts if there are many to be handled
in a short period of time.
Also use the system clock and async waiting on it for scheduling the timeouts.
Diffstat (limited to 'ext/dtls')
-rw-r--r-- | ext/dtls/gstdtlsconnection.c | 181 | ||||
-rw-r--r-- | ext/dtls/gstdtlsconnection.h | 2 | ||||
-rw-r--r-- | ext/dtls/gstdtlsenc.c | 8 |
3 files changed, 100 insertions, 91 deletions
diff --git a/ext/dtls/gstdtlsconnection.c b/ext/dtls/gstdtlsconnection.c index bdd9a1719..8734c4222 100644 --- a/ext/dtls/gstdtlsconnection.c +++ b/ext/dtls/gstdtlsconnection.c @@ -74,6 +74,9 @@ static GParamSpec *properties[NUM_PROPERTIES]; static int connection_ex_index; +static GstClock *system_clock; +static void handle_timeout (gpointer data, gpointer user_data); + struct _GstDtlsConnectionPrivate { SSL *ssl; @@ -83,7 +86,6 @@ struct _GstDtlsConnectionPrivate gboolean is_client; gboolean is_alive; gboolean keys_exported; - gboolean timeout_set; GMutex mutex; GCond condition; @@ -92,6 +94,9 @@ struct _GstDtlsConnectionPrivate gint bio_buffer_offset; GClosure *send_closure; + + gboolean timeout_pending; + GThreadPool *thread_pool; }; static void gst_dtls_connection_finalize (GObject * gobject); @@ -99,7 +104,6 @@ static void gst_dtls_connection_set_property (GObject *, guint prop_id, const GValue *, GParamSpec *); static void log_state (GstDtlsConnection *, const gchar * str); -static gpointer connection_timeout_thread_func (GstDtlsConnection *); static void export_srtp_keys (GstDtlsConnection *); static void openssl_poll (GstDtlsConnection *); static int openssl_verify_callback (int preverify_ok, @@ -154,6 +158,8 @@ gst_dtls_connection_class_init (GstDtlsConnectionClass * klass) _gst_dtls_init_openssl (); gobject_class->finalize = gst_dtls_connection_finalize; + + system_clock = gst_system_clock_obtain (); } static void @@ -171,7 +177,6 @@ gst_dtls_connection_init (GstDtlsConnection * self) priv->is_client = FALSE; priv->is_alive = TRUE; priv->keys_exported = FALSE; - priv->timeout_set = FALSE; priv->bio_buffer = NULL; priv->bio_buffer_len = 0; @@ -179,6 +184,13 @@ gst_dtls_connection_init (GstDtlsConnection * self) g_mutex_init (&priv->mutex); g_cond_init (&priv->condition); + + /* Thread pool for handling timeouts, we only need one thread for that + * really and share threads with all other thread pools around there as + * this is not going to happen very often */ + priv->thread_pool = g_thread_pool_new (handle_timeout, NULL, 1, FALSE, NULL); + g_assert (priv->thread_pool); + priv->timeout_pending = FALSE; } static void @@ -187,6 +199,8 @@ gst_dtls_connection_finalize (GObject * gobject) GstDtlsConnection *self = GST_DTLS_CONNECTION (gobject); GstDtlsConnectionPrivate *priv = self->priv; + g_thread_pool_free (priv->thread_pool, TRUE, TRUE); + priv->thread_pool = NULL; SSL_free (priv->ssl); priv->ssl = NULL; @@ -258,7 +272,6 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client) GST_TRACE_OBJECT (self, "locked @ start"); priv->is_alive = TRUE; - priv->timeout_set = FALSE; priv->bio_buffer = NULL; priv->bio_buffer_len = 0; priv->bio_buffer_offset = 0; @@ -281,34 +294,94 @@ gst_dtls_connection_start (GstDtlsConnection * self, gboolean is_client) } static void -gst_dtls_connection_start_timeout_locked (GstDtlsConnection * self) +handle_timeout (gpointer data, gpointer user_data) +{ + GstDtlsConnection *self = data; + GstDtlsConnectionPrivate *priv; + gint ret; + + priv = self->priv; + + g_mutex_lock (&priv->mutex); + priv->timeout_pending = FALSE; + if (priv->is_alive) { + ret = DTLSv1_handle_timeout (priv->ssl); + + GST_DEBUG_OBJECT (self, "handle timeout returned %d, is_alive: %d", ret, + priv->is_alive); + + if (ret < 0) { + GST_WARNING_OBJECT (self, "handling timeout failed"); + } else if (ret > 0) { + log_state (self, "handling timeout before poll"); + openssl_poll (self); + log_state (self, "handling timeout after poll"); + } + } + g_mutex_unlock (&priv->mutex); + g_object_unref (self); +} + +static gboolean +schedule_timeout_handling (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstDtlsConnection *self = user_data; + + g_mutex_lock (&self->priv->mutex); + if (self->priv->is_alive && !self->priv->timeout_pending) { + self->priv->timeout_pending = TRUE; + + GST_TRACE_OBJECT (self, "Schedule timeout now"); + g_thread_pool_push (self->priv->thread_pool, g_object_ref (self), NULL); + } + g_mutex_unlock (&self->priv->mutex); + + return TRUE; +} + +static void +gst_dtls_connection_check_timeout_locked (GstDtlsConnection * self) { GstDtlsConnectionPrivate *priv; - GError *error = NULL; - gchar *thread_name; + struct timeval timeout; + gint64 end_time, wait_time; g_return_if_fail (GST_IS_DTLS_CONNECTION (self)); priv = self->priv; - if (priv->thread) - return; - thread_name = g_strdup_printf ("connection_thread_%p", self); + if (DTLSv1_get_timeout (priv->ssl, &timeout)) { + wait_time = timeout.tv_sec * G_USEC_PER_SEC + timeout.tv_usec; - GST_INFO_OBJECT (self, "starting connection timeout"); - priv->thread = g_thread_try_new (thread_name, - (GThreadFunc) connection_timeout_thread_func, self, &error); - if (error) { - GST_WARNING_OBJECT (self, "error creating connection thread: %s (%d)", - error->message, error->code); - g_clear_error (&error); - } + GST_DEBUG_OBJECT (self, "waiting for %" G_GINT64_FORMAT " usec", wait_time); + if (wait_time) { + GstClockID clock_id; + GstClockReturn clock_return; - g_free (thread_name); + end_time = gst_clock_get_time (system_clock) + wait_time * GST_USECOND; + + clock_id = gst_clock_new_single_shot_id (system_clock, end_time); + clock_return = + gst_clock_id_wait_async (clock_id, schedule_timeout_handling, + g_object_ref (self), (GDestroyNotify) g_object_unref); + g_assert (clock_return == GST_CLOCK_OK); + gst_clock_id_unref (clock_id); + } else { + if (self->priv->is_alive && !self->priv->timeout_pending) { + self->priv->timeout_pending = TRUE; + GST_TRACE_OBJECT (self, "Schedule timeout now"); + + g_thread_pool_push (self->priv->thread_pool, g_object_ref (self), NULL); + } + } + } else { + GST_DEBUG_OBJECT (self, "no timeout set"); + } } void -gst_dtls_connection_start_timeout (GstDtlsConnection * self) +gst_dtls_connection_check_timeout (GstDtlsConnection * self) { GstDtlsConnectionPrivate *priv; @@ -319,7 +392,7 @@ gst_dtls_connection_start_timeout (GstDtlsConnection * self) GST_TRACE_OBJECT (self, "locking @ start_timeout"); g_mutex_lock (&priv->mutex); GST_TRACE_OBJECT (self, "locked @ start_timeout"); - gst_dtls_connection_start_timeout_locked (self); + gst_dtls_connection_check_timeout_locked (self); g_mutex_unlock (&priv->mutex); GST_TRACE_OBJECT (self, "unlocking @ start_timeout"); } @@ -506,66 +579,6 @@ log_state (GstDtlsConnection * self, const gchar * str) states, SSL_get_state (priv->ssl), SSL_state_string_long (priv->ssl)); } -static gpointer -connection_timeout_thread_func (GstDtlsConnection * self) -{ - GstDtlsConnectionPrivate *priv = self->priv; - struct timeval timeout; - gint64 end_time, wait_time; - gint ret; - - while (priv->is_alive) { - GST_TRACE_OBJECT (self, "locking @ timeout"); - g_mutex_lock (&priv->mutex); - GST_TRACE_OBJECT (self, "locked @ timeout"); - - if (DTLSv1_get_timeout (priv->ssl, &timeout)) { - wait_time = timeout.tv_sec * G_USEC_PER_SEC + timeout.tv_usec; - - if (wait_time) { - GST_DEBUG_OBJECT (self, "waiting for %" G_GINT64_FORMAT " usec", - wait_time); - - end_time = g_get_monotonic_time () + wait_time; - - GST_TRACE_OBJECT (self, "wait @ timeout"); - g_cond_wait_until (&priv->condition, &priv->mutex, end_time); - GST_TRACE_OBJECT (self, "continued @ timeout"); - } - - ret = DTLSv1_handle_timeout (priv->ssl); - - GST_DEBUG_OBJECT (self, "handle timeout returned %d, is_alive: %d", ret, - priv->is_alive); - - if (ret < 0) { - GST_TRACE_OBJECT (self, "unlocking @ timeout failed"); - g_mutex_unlock (&priv->mutex); - break; /* self failed after DTLS1_TMO_ALERT_COUNT (12) attempts */ - } - - if (ret > 0) { - log_state (self, "handling timeout before poll"); - openssl_poll (self); - log_state (self, "handling timeout after poll"); - } - } else { - GST_DEBUG_OBJECT (self, "no timeout set, stopping thread"); - priv->timeout_set = FALSE; - priv->thread = NULL; - g_mutex_unlock (&priv->mutex); - break; - } - - GST_TRACE_OBJECT (self, "unlocking @ timeout"); - g_mutex_unlock (&priv->mutex); - } - - log_state (self, "timeout thread exiting"); - - return NULL; -} - static void export_srtp_keys (GstDtlsConnection * self) { @@ -858,11 +871,7 @@ bio_method_ctrl (BIO * bio, int cmd, long arg1, void *arg2) case BIO_CTRL_DGRAM_SET_NEXT_TIMEOUT: case BIO_CTRL_DGRAM_SET_RECV_TIMEOUT: GST_LOG_OBJECT (self, "BIO: Timeout set"); - priv->timeout_set = TRUE; - if (priv->thread) - g_cond_signal (&priv->condition); - else - gst_dtls_connection_start_timeout_locked (self); + gst_dtls_connection_check_timeout_locked (self); return 1; case BIO_CTRL_RESET: priv->bio_buffer = NULL; diff --git a/ext/dtls/gstdtlsconnection.h b/ext/dtls/gstdtlsconnection.h index 0051dd45e..6260b935b 100644 --- a/ext/dtls/gstdtlsconnection.h +++ b/ext/dtls/gstdtlsconnection.h @@ -85,7 +85,7 @@ struct _GstDtlsConnectionClass { GType gst_dtls_connection_get_type(void) G_GNUC_CONST; void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client); -void gst_dtls_connection_start_timeout(GstDtlsConnection *); +void gst_dtls_connection_check_timeout(GstDtlsConnection *); /* * Stops the connections, it is not required to call this function. diff --git a/ext/dtls/gstdtlsenc.c b/ext/dtls/gstdtlsenc.c index 2f8d48382..477af1077 100644 --- a/ext/dtls/gstdtlsenc.c +++ b/ext/dtls/gstdtlsenc.c @@ -400,7 +400,7 @@ src_task_loop (GstPad * pad) GstDtlsEnc *self = GST_DTLS_ENC (GST_PAD_PARENT (pad)); GstFlowReturn ret; GstBuffer *buffer; - gboolean start_connection_timeout = FALSE; + gboolean check_connection_timeout = FALSE; GST_TRACE_OBJECT (self, "src loop: acquiring lock"); g_mutex_lock (&self->queue_lock); @@ -444,14 +444,14 @@ src_task_loop (GstPad * pad) gst_caps_unref (caps); gst_segment_init (&segment, GST_FORMAT_BYTES); gst_pad_push_event (self->src, gst_event_new_segment (&segment)); - start_connection_timeout = TRUE; + check_connection_timeout = TRUE; } GST_TRACE_OBJECT (self, "src loop: releasing lock"); ret = gst_pad_push (self->src, buffer); - if (start_connection_timeout) - gst_dtls_connection_start_timeout (self->connection); + if (check_connection_timeout) + gst_dtls_connection_check_timeout (self->connection); if (G_UNLIKELY (ret != GST_FLOW_OK)) { GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s", |