summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Asleson <tasleson@redhat.com>2016-11-01 17:48:39 -0500
committerTony Asleson <tasleson@redhat.com>2016-11-02 16:38:00 -0500
commit24803bbaadd8bd497e2ca337786b01725df61736 (patch)
tree7bd5b2da144bbdadb8b037220aa31800e13e097b
parentc8e8439b3dc037fd73a2eab94e7e796cfb8283a1 (diff)
downloadlvm2-24803bbaadd8bd497e2ca337786b01725df61736.tar.gz
lvmdbusd: Return results in main thread
Also introduce some additional new code to execute code other code in main thread too. ref. https://bugs.freedesktop.org/show_bug.cgi?id=98521
-rw-r--r--daemons/lvmdbusd/background.py9
-rw-r--r--daemons/lvmdbusd/request.py12
-rw-r--r--daemons/lvmdbusd/utils.py63
3 files changed, 74 insertions, 10 deletions
diff --git a/daemons/lvmdbusd/background.py b/daemons/lvmdbusd/background.py
index ab0ac2a54..e8b42feeb 100644
--- a/daemons/lvmdbusd/background.py
+++ b/daemons/lvmdbusd/background.py
@@ -12,7 +12,8 @@ import subprocess
from . import cfg
from .cmdhandler import options_to_cli_args
import dbus
-from .utils import pv_range_append, pv_dest_ranges, log_error, log_debug
+from .utils import pv_range_append, pv_dest_ranges, log_error, log_debug, \
+ mt_async_result
import traceback
import os
@@ -188,9 +189,9 @@ def wait_thread(job, timeout, cb, cbe):
# We need to put the wait on it's own thread, so that we don't block the
# entire dbus queue processing thread
try:
- cb(job.state.Wait(timeout))
+ mt_async_result(cb, job.state.Wait(timeout))
except Exception as e:
- cbe("Wait exception: %s" % str(e))
+ mt_async_result(cbe, "Wait exception: %s" % str(e))
return 0
@@ -198,7 +199,7 @@ def add_wait(job, timeout, cb, cbe):
if timeout == 0:
# Users are basically polling, do not create thread
- cb(job.Complete)
+ mt_async_result(cb, job.Complete)
else:
t = threading.Thread(
target=wait_thread,
diff --git a/daemons/lvmdbusd/request.py b/daemons/lvmdbusd/request.py
index c48d043bc..ca45e8c98 100644
--- a/daemons/lvmdbusd/request.py
+++ b/daemons/lvmdbusd/request.py
@@ -13,7 +13,7 @@ from gi.repository import GLib
from .job import Job
from . import cfg
import traceback
-from .utils import log_error
+from .utils import log_error, mt_async_result
class RequestEntry(object):
@@ -57,9 +57,9 @@ class RequestEntry(object):
self._job = Job(self, self._job_state)
cfg.om.register_object(self._job, True)
if self._return_tuple:
- self.cb(('/', self._job.dbus_object_path()))
+ mt_async_result(self.cb, ('/', self._job.dbus_object_path()))
else:
- self.cb(self._job.dbus_object_path())
+ mt_async_result(self.cb, self._job.dbus_object_path())
def run_cmd(self):
try:
@@ -110,9 +110,9 @@ class RequestEntry(object):
if error_rc == 0:
if self.cb:
if self._return_tuple:
- self.cb((result, '/'))
+ mt_async_result(self.cb, (result, '/'))
else:
- self.cb(result)
+ mt_async_result(self.cb, result)
else:
if self.cb_error:
if not error_exception:
@@ -123,7 +123,7 @@ class RequestEntry(object):
else:
error_exception = Exception(error_msg)
- self.cb_error(error_exception)
+ mt_async_result(self.cb_error, error_exception)
else:
# We have a job and it's complete, indicate that it's done.
# TODO: We need to signal the job is done too.
diff --git a/daemons/lvmdbusd/utils.py b/daemons/lvmdbusd/utils.py
index ef7e61f6f..12b797e1f 100644
--- a/daemons/lvmdbusd/utils.py
+++ b/daemons/lvmdbusd/utils.py
@@ -17,6 +17,8 @@ import datetime
import dbus
from lvmdbusd import cfg
+from gi.repository import GLib
+import threading
STDOUT_TTY = os.isatty(sys.stdout.fileno())
@@ -494,3 +496,64 @@ def validate_tag(interface, tag):
raise dbus.exceptions.DBusException(
interface, 'tag (%s) contains invalid character, allowable set(%s)'
% (tag, _ALLOWABLE_TAG_CH))
+
+
+# The methods below which start with mt_* are used to execute the desired code
+# on the the main thread of execution to alleviate any issues the dbus-python
+# library with regards to multi-threaded access. Essentially, we are trying to
+# ensure all dbus library interaction is done from the same thread!
+
+
+def _async_result(call_back, results):
+ log_debug('Results = %s' % str(results))
+ call_back(results)
+
+# Return result in main thread
+def mt_async_result(call_back, results):
+ GLib.idle_add(_async_result, call_back, results)
+
+
+# Run the supplied function and arguments on the main thread and wait for them
+# to complete while allowing the ability to get the return value too.
+#
+# Example:
+# result = MThreadRunner(foo, arg1, arg2).done()
+#
+class MThreadRunner(object):
+
+ @staticmethod
+ def runner(obj):
+ obj._run()
+ with obj.cond:
+ obj.function_complete = True
+ obj.cond.notify_all()
+
+ def __init__(self, function, *args):
+ self.f = function
+ self.rc = None
+ self.args = args
+ self.function_complete = False
+ self.cond = threading.Condition(threading.Lock())
+
+ def done(self):
+ GLib.idle_add(MThreadRunner.runner, self)
+ with self.cond:
+ if not self.function_complete:
+ self.cond.wait()
+ return self.rc
+
+ def _run(self):
+ if len(self.args):
+ self.rc = self.f(*self.args)
+ else:
+ self.rc = self.f()
+
+
+def _remove_objects(dbus_objects_rm):
+ for o in dbus_objects_rm:
+ cfg.om.remove_object(o, emit_signal=True)
+
+
+# Remove dbus objects from main thread
+def mt_remove_dbus_objects(objs):
+ MThreadRunner(_remove_objects, objs).done()