summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Asleson <tasleson@redhat.com>2016-11-02 14:14:56 -0500
committerTony Asleson <tasleson@redhat.com>2016-11-02 16:39:13 -0500
commit96118a2508e1cd3aaf5ba7a0129ff8ecc7ac4af2 (patch)
tree10a4a66a82170abf181dadacae437eba2709a7f7
parent95abadd13c468b56612c1966d28daddfdeee6e2e (diff)
downloadlvm2-96118a2508e1cd3aaf5ba7a0129ff8ecc7ac4af2.tar.gz
lvmdbusd: Stop using threads for job wait
Instead of creating a thread to handle the case where a client is calling job.Wait, we will utilize a timer. This significantly reduces the number of threads that get created and destroyed while the service is running.
-rw-r--r--daemons/lvmdbusd/background.py29
-rw-r--r--daemons/lvmdbusd/job.py83
2 files changed, 81 insertions, 31 deletions
diff --git a/daemons/lvmdbusd/background.py b/daemons/lvmdbusd/background.py
index e8b42feeb..0220b9773 100644
--- a/daemons/lvmdbusd/background.py
+++ b/daemons/lvmdbusd/background.py
@@ -12,8 +12,7 @@ import subprocess
from . import cfg
from .cmdhandler import options_to_cli_args
import dbus
-from .utils import pv_range_append, pv_dest_ranges, log_error, log_debug, \
- mt_async_result
+from .utils import pv_range_append, pv_dest_ranges, log_error, log_debug
import traceback
import os
@@ -184,29 +183,3 @@ def add(command, reporting_job):
with _rlock:
_thread_list.append(t)
-
-def wait_thread(job, timeout, cb, cbe):
- # We need to put the wait on it's own thread, so that we don't block the
- # entire dbus queue processing thread
- try:
- mt_async_result(cb, job.state.Wait(timeout))
- except Exception as e:
- mt_async_result(cbe, "Wait exception: %s" % str(e))
- return 0
-
-
-def add_wait(job, timeout, cb, cbe):
-
- if timeout == 0:
- # Users are basically polling, do not create thread
- mt_async_result(cb, job.Complete)
- else:
- t = threading.Thread(
- target=wait_thread,
- name="thread job.Wait: %s" % job.dbus_object_path(),
- args=(job, timeout, cb, cbe)
- )
-
- t.start()
- with _rlock:
- _thread_list.append(t)
diff --git a/daemons/lvmdbusd/job.py b/daemons/lvmdbusd/job.py
index 1158370c2..81048a630 100644
--- a/daemons/lvmdbusd/job.py
+++ b/daemons/lvmdbusd/job.py
@@ -8,12 +8,54 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .automatedproperties import AutomatedProperties
-from .utils import job_obj_path_generate
+from .utils import job_obj_path_generate, mt_async_result, log_debug
from . import cfg
from .cfg import JOB_INTERFACE
import dbus
import threading
-from . import background
+from gi.repository import GLib
+
+
+# Class that handles a client waiting for something to be complete. We either
+# get a timeout or the operation is done.
+class WaitingClient(object):
+
+ # A timeout occurred
+ @staticmethod
+ def _timeout(wc):
+ with wc.rlock:
+ if wc.in_use:
+ wc.in_use = False
+ # Remove ourselves from waiting client
+ wc.job_state.remove_waiting_client(wc)
+ wc.timer_id = -1
+ mt_async_result(wc.cb, wc.job_state.Complete)
+ wc.job_state = None
+
+ def __init__(self, job_state, tmo, cb, cbe):
+ self.rlock = threading.RLock()
+ self.job_state = job_state
+ self.cb = cb
+ self.cbe = cbe
+ self.in_use = True # Indicates if object is in play
+ self.timer_id = -1
+ if tmo > 0:
+ self.timer_id = GLib.timeout_add_seconds(
+ tmo, WaitingClient._timeout, self)
+
+ # The job finished before the timer popped and we are being notified that
+ # it's done
+ def notify(self):
+ with self.rlock:
+ if self.in_use:
+ self.in_use = False
+ # Clear timer
+ if self.timer_id != -1:
+ GLib.source_remove(self.timer_id)
+ self.timer_id = -1
+
+ mt_async_result(self.cb, self.job_state.Complete)
+ self.job_state = None
# noinspection PyPep8Naming
@@ -27,6 +69,7 @@ class JobState(object):
self._cond = threading.Condition(self.rlock)
self._ec = 0
self._stderr = ''
+ self._waiting_clients = []
# This is an lvm command that is just taking too long and doesn't
# support background operation
@@ -58,6 +101,7 @@ class JobState(object):
self._complete = value
self._percent = 100
self._cond.notify_all()
+ self.notify_waiting_clients()
@property
def GetError(self):
@@ -101,6 +145,35 @@ class JobState(object):
return self._request.result()
return '/'
+ def add_waiting_client(self, client):
+ with self.rlock:
+ # Avoid race condition where it goes complete before we get added
+ # to the list of waiting clients
+ if self.Complete:
+ client.notify()
+ else:
+ self._waiting_clients.append(client)
+
+ def remove_waiting_client(self, client):
+ # If a waiting client timer pops before the job is done we will allow
+ # the client to remove themselves from the list. As we have a lock
+ # here and a lock in the waiting client too, and they can be obtained
+ # in different orders, a dead lock can occur.
+ # As this remove is really optional, we will try to acquire the lock
+ # and remove. If we are unsuccessful it's not fatal, we just delay
+ # the time when the objects can be garbage collected by python
+ if self.rlock.acquire(False):
+ try:
+ self._waiting_clients.remove(client)
+ finally:
+ self.rlock.release()
+
+ def notify_waiting_clients(self):
+ with self.rlock:
+ for c in self._waiting_clients:
+ c.notify()
+
+ self._waiting_clients = []
# noinspection PyPep8Naming
class Job(AutomatedProperties):
@@ -155,7 +228,11 @@ class Job(AutomatedProperties):
out_signature='b',
async_callbacks=('cb', 'cbe'))
def Wait(self, timeout, cb, cbe):
- background.add_wait(self, timeout, cb, cbe)
+ if timeout == 0 or self.state.Complete:
+ cb(dbus.Boolean(self.state.Complete))
+ else:
+ self.state.add_waiting_client(
+ WaitingClient(self.state, timeout, cb, cbe))
@property
def Result(self):