summaryrefslogtreecommitdiff
path: root/tests/functional-tests/common/utils
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional-tests/common/utils')
-rw-r--r--tests/functional-tests/common/utils/applicationstest.py85
-rw-r--r--tests/functional-tests/common/utils/dconf.py18
-rw-r--r--tests/functional-tests/common/utils/expectedFailure.py44
-rw-r--r--tests/functional-tests/common/utils/extractor.py109
-rw-r--r--tests/functional-tests/common/utils/helpers.py422
-rw-r--r--tests/functional-tests/common/utils/html.py102
-rw-r--r--tests/functional-tests/common/utils/minertest.py36
-rw-r--r--tests/functional-tests/common/utils/options.py8
-rw-r--r--tests/functional-tests/common/utils/storetest.py28
-rw-r--r--tests/functional-tests/common/utils/system.py215
-rw-r--r--tests/functional-tests/common/utils/writebacktest.py78
11 files changed, 604 insertions, 541 deletions
diff --git a/tests/functional-tests/common/utils/applicationstest.py b/tests/functional-tests/common/utils/applicationstest.py
index 72a8b84c8..ad8e51cf1 100644
--- a/tests/functional-tests/common/utils/applicationstest.py
+++ b/tests/functional-tests/common/utils/applicationstest.py
@@ -28,7 +28,8 @@ import shutil
import os
import time
-APPLICATIONS_TMP_DIR = os.path.join (cfg.TEST_MONITORED_TMP_DIR, "test-applications-monitored")
+APPLICATIONS_TMP_DIR = os.path.join(
+ cfg.TEST_MONITORED_TMP_DIR, "test-applications-monitored")
index_dirs = [APPLICATIONS_TMP_DIR]
CONF_OPTIONS = {
@@ -43,86 +44,84 @@ CONF_OPTIONS = {
# Copy rate, 10KBps (1024b/100ms)
SLOWCOPY_RATE = 1024
+
class CommonTrackerApplicationTest (ut.TestCase):
- def get_urn_count_by_url (self, url):
+ def get_urn_count_by_url(self, url):
select = """
SELECT ?u WHERE { ?u nie:url \"%s\" }
""" % (url)
- return len (self.tracker.query (select))
-
+ return len(self.tracker.query(select))
- def get_test_image (self):
+ def get_test_image(self):
TEST_IMAGE = "test-image-1.jpg"
return TEST_IMAGE
- def get_test_video (self):
+ def get_test_video(self):
TEST_VIDEO = "test-video-1.mp4"
return TEST_VIDEO
- def get_test_music (self):
- TEST_AUDIO = "test-music-1.mp3"
+ def get_test_music(self):
+ TEST_AUDIO = "test-music-1.mp3"
return TEST_AUDIO
- def get_data_dir (self):
+ def get_data_dir(self):
return self.datadir
- def get_dest_dir (self):
+ def get_dest_dir(self):
return APPLICATIONS_TMP_DIR
- def slowcopy_file_fd (self, src, fdest, rate=SLOWCOPY_RATE):
+ def slowcopy_file_fd(self, src, fdest, rate=SLOWCOPY_RATE):
"""
@rate: bytes per 100ms
"""
- log ("Copying slowly\n '%s' to\n '%s'" % (src, fdest.name))
- fsrc = open (src, 'rb')
- buffer_ = fsrc.read (rate)
+ log("Copying slowly\n '%s' to\n '%s'" % (src, fdest.name))
+ fsrc = open(src, 'rb')
+ buffer_ = fsrc.read(rate)
while (buffer_ != ""):
- fdest.write (buffer_)
- time.sleep (0.1)
- buffer_ = fsrc.read (rate)
- fsrc.close ()
-
+ fdest.write(buffer_)
+ time.sleep(0.1)
+ buffer_ = fsrc.read(rate)
+ fsrc.close()
- def slowcopy_file (self, src, dst, rate=SLOWCOPY_RATE):
+ def slowcopy_file(self, src, dst, rate=SLOWCOPY_RATE):
"""
@rate: bytes per 100ms
"""
- fdest = open (dst, 'wb')
- self.slowcopy_file_fd (src, fdest, rate)
- fdest.close ()
+ fdest = open(dst, 'wb')
+ self.slowcopy_file_fd(src, fdest, rate)
+ fdest.close()
@classmethod
- def setUp (self):
+ def setUp(self):
# Create temp directory to monitor
- if (os.path.exists (APPLICATIONS_TMP_DIR)):
- shutil.rmtree (APPLICATIONS_TMP_DIR)
- os.makedirs (APPLICATIONS_TMP_DIR)
+ if (os.path.exists(APPLICATIONS_TMP_DIR)):
+ shutil.rmtree(APPLICATIONS_TMP_DIR)
+ os.makedirs(APPLICATIONS_TMP_DIR)
# Use local directory if available. Installation otherwise.
- if os.path.exists (os.path.join (os.getcwd (),
- "test-apps-data")):
- self.datadir = os.path.join (os.getcwd (),
- "test-apps-data")
+ if os.path.exists(os.path.join(os.getcwd(),
+ "test-apps-data")):
+ self.datadir = os.path.join(os.getcwd(),
+ "test-apps-data")
else:
- self.datadir = os.path.join (cfg.DATADIR,
- "tracker-tests",
- "test-apps-data")
-
+ self.datadir = os.path.join(cfg.DATADIR,
+ "tracker-tests",
+ "test-apps-data")
- self.system = TrackerSystemAbstraction ()
- self.system.tracker_all_testing_start (CONF_OPTIONS)
+ self.system = TrackerSystemAbstraction()
+ self.system.tracker_all_testing_start(CONF_OPTIONS)
# Returns when ready
self.tracker = self.system.store
- log ("Ready to go!")
+ log("Ready to go!")
@classmethod
- def tearDown (self):
- #print "Stopping the daemon in test mode (Doing nothing now)"
- self.system.tracker_all_testing_stop ()
+ def tearDown(self):
+ # print "Stopping the daemon in test mode (Doing nothing now)"
+ self.system.tracker_all_testing_stop()
# Remove monitored directory
- if (os.path.exists (APPLICATIONS_TMP_DIR)):
- shutil.rmtree (APPLICATIONS_TMP_DIR)
+ if (os.path.exists(APPLICATIONS_TMP_DIR)):
+ shutil.rmtree(APPLICATIONS_TMP_DIR)
diff --git a/tests/functional-tests/common/utils/dconf.py b/tests/functional-tests/common/utils/dconf.py
index 0af94cecb..986aeee06 100644
--- a/tests/functional-tests/common/utils/dconf.py
+++ b/tests/functional-tests/common/utils/dconf.py
@@ -5,7 +5,9 @@ import os
from helpers import log
+
class DConfClient(object):
+
"""
Allow changing Tracker configuration in DConf.
@@ -20,7 +22,7 @@ class DConfClient(object):
break.
"""
- def __init__ (self, schema):
+ def __init__(self, schema):
self._settings = Gio.Settings.new(schema)
backend = self._settings.get_property('backend')
@@ -69,10 +71,10 @@ class DConfClient(object):
# XDG_CONFIG_HOME is useless, so we use HOME. This code should not be
# needed unless for some reason the test is not being run via the
# 'test-runner.sh' script.
- dconf_db = os.path.join (os.environ ["HOME"],
- ".config",
- "dconf",
- "trackertest")
- if os.path.exists (dconf_db):
- log ("[Conf] Removing dconf database: " + dconf_db)
- os.remove (dconf_db)
+ dconf_db = os.path.join(os.environ["HOME"],
+ ".config",
+ "dconf",
+ "trackertest")
+ if os.path.exists(dconf_db):
+ log("[Conf] Removing dconf database: " + dconf_db)
+ os.remove(dconf_db)
diff --git a/tests/functional-tests/common/utils/expectedFailure.py b/tests/functional-tests/common/utils/expectedFailure.py
index a496ee21d..bdc762a50 100644
--- a/tests/functional-tests/common/utils/expectedFailure.py
+++ b/tests/functional-tests/common/utils/expectedFailure.py
@@ -1,25 +1,25 @@
#!/usr/bin/python
-## Code taken and modified from unittest2 framework (case.py)
+# Code taken and modified from unittest2 framework (case.py)
-## Copyright (c) 1999-2003 Steve Purcell
-## Copyright (c) 2003-2010 Python Software Foundation
-## Copyright (c) 2010, Nokia (ivan.frade@nokia.com)
+# Copyright (c) 1999-2003 Steve Purcell
+# Copyright (c) 2003-2010 Python Software Foundation
+# Copyright (c) 2010, Nokia (ivan.frade@nokia.com)
-## This module is free software, and you may redistribute it and/or modify
-## it under the same terms as Python itself, so long as this copyright message
-## and disclaimer are retained in their original form.
+# This module is free software, and you may redistribute it and/or modify
+# it under the same terms as Python itself, so long as this copyright message
+# and disclaimer are retained in their original form.
-## IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
-## SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
-## THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
-## DAMAGE.
+# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
-## THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
-## LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-## PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
-## AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
-## SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
Write values in tracker and check the actual values are written
@@ -30,6 +30,7 @@ import unittest2 as ut
from unittest2.compatibility import wraps
import configuration as cfg
+
def expectedFailureBug(bugnumber):
"""
Decorator to mark bugs with ExpectedFailure. In case that a expected failure PASS
@@ -37,22 +38,24 @@ def expectedFailureBug(bugnumber):
Keep your bugs and tests in sync!
"""
- def decorator (func):
+ def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
raise ut.case._ExpectedFailure(sys.exc_info())
- raise Exception ("Unexpected success. This should fail because of bug " +str(bugnumber))
+ raise Exception(
+ "Unexpected success. This should fail because of bug " + str(bugnumber))
return wrapper
return decorator
+
def expectedFailureJournal():
"""
Decorator to handle tests that are expected to fail when journal is disabled.
"""
- def decorator (func):
+ def decorator(func):
# no wrapping if journal is enabled, test is expected to pass
if not cfg.disableJournal:
return func
@@ -63,6 +66,7 @@ def expectedFailureJournal():
func(*args, **kwargs)
except Exception:
raise ut.case._ExpectedFailure(sys.exc_info())
- raise Exception ("Unexpected success. This should fail because journal is disabled")
+ raise Exception(
+ "Unexpected success. This should fail because journal is disabled")
return wrapper
return decorator
diff --git a/tests/functional-tests/common/utils/extractor.py b/tests/functional-tests/common/utils/extractor.py
index 8dd05604e..36ed0b744 100644
--- a/tests/functional-tests/common/utils/extractor.py
+++ b/tests/functional-tests/common/utils/extractor.py
@@ -26,6 +26,7 @@ import subprocess
class ExtractorParser(object):
+
def parse_tracker_extract_output(self, text):
"""
Parse stdout of `tracker-extract --file` to get SPARQL statements.
@@ -47,9 +48,9 @@ class ExtractorParser(object):
value = extras[value]
if metadata.has_key(att):
- metadata [att].append(value)
+ metadata[att].append(value)
else:
- metadata [att] = [value]
+ metadata[att] = [value]
return metadata
@@ -104,7 +105,7 @@ class ExtractorParser(object):
grouped_lines = []
current_line = ""
anon_node_open = False
- for l in embedded.split ("\n\t"):
+ for l in embedded.split("\n\t"):
if "[" in l:
current_line = current_line + l
anon_node_open = True
@@ -113,7 +114,7 @@ class ExtractorParser(object):
if "]" in l:
anon_node_open = False
current_line += l
- final_lines = self.__handle_anon_nodes (current_line.strip ())
+ final_lines = self.__handle_anon_nodes(current_line.strip())
grouped_lines = grouped_lines + final_lines
current_line = ""
continue
@@ -121,23 +122,24 @@ class ExtractorParser(object):
if anon_node_open:
current_line += l
else:
- if (len (l.strip ()) == 0):
+ if (len(l.strip()) == 0):
continue
-
- final_lines = self.__handle_multivalues (l.strip ())
+
+ final_lines = self.__handle_multivalues(l.strip())
grouped_lines = grouped_lines + final_lines
- return map (self.__clean_value, grouped_lines)
+ return map(self.__clean_value, grouped_lines)
def __process_where_part(self, where):
- gettags = re.compile ("(\?\w+)\ a\ nao:Tag\ ;\ nao:prefLabel\ \"([\w\ -]+)\"")
+ gettags = re.compile(
+ "(\?\w+)\ a\ nao:Tag\ ;\ nao:prefLabel\ \"([\w\ -]+)\"")
tags = {}
- for l in where.split ("\n"):
- if len (l) == 0:
+ for l in where.split("\n"):
+ if len(l) == 0:
continue
- match = gettags.search (l)
+ match = gettags.search(l)
if (match):
- tags [match.group(1)] = match.group (2)
+ tags[match.group(1)] = match.group(2)
else:
print "This line is not a tag:", l
@@ -150,17 +152,17 @@ class ExtractorParser(object):
-> a nfo:Image ;
-> a nmm:Photo ;
"""
- hasEscapedComma = re.compile ("\".+,.+\"")
+ hasEscapedComma = re.compile("\".+,.+\"")
- if "," in line and not hasEscapedComma.search (line):
- prop, multival = line.split (" ", 1)
+ if "," in line and not hasEscapedComma.search(line):
+ prop, multival = line.split(" ", 1)
results = []
- for value in multival.split (","):
- results.append ("%s %s" % (prop, value.strip ()))
+ for value in multival.split(","):
+ results.append("%s %s" % (prop, value.strip()))
return results
else:
return [line]
-
+
def __handle_anon_nodes(self, line):
"""
Traslates anonymous nodes in 'flat' properties:
@@ -175,11 +177,11 @@ class ExtractorParser(object):
-> nfo:hasMediaFileListEntry:entryUrl "file://x.mp3"
"""
-
+
# hasTag case
- if line.startswith ("nao:hasTag"):
- getlabel = re.compile ("nao:prefLabel\ \"([\w\ -]+)\"")
- match = getlabel.search (line)
+ if line.startswith("nao:hasTag"):
+ getlabel = re.compile("nao:prefLabel\ \"([\w\ -]+)\"")
+ match = getlabel.search(line)
if (match):
line = 'nao:hasTag:prefLabel "%s" ;' % (match.group(1))
return [line]
@@ -188,32 +190,34 @@ class ExtractorParser(object):
return [line]
# location case
- elif line.startswith ("slo:location"):
+ elif line.startswith("slo:location"):
results = []
# Can have country AND/OR city
- getpa = re.compile ("slo:postalAddress\ \<([\w:-]+)\>")
- pa_match = getpa.search (line)
-
+ getpa = re.compile("slo:postalAddress\ \<([\w:-]+)\>")
+ pa_match = getpa.search(line)
+
if (pa_match):
- results.append ('slo:location:postalAddress "%s" ;' % (pa_match.group(1)))
+ results.append(
+ 'slo:location:postalAddress "%s" ;' % (pa_match.group(1)))
else:
print "FIXME another location subproperty in ", line
return results
- elif line.startswith ("nco:creator"):
- getcreator = re.compile ("nco:fullname\ \"([\w\ ]+)\"")
- creator_match = getcreator.search (line)
+ elif line.startswith("nco:creator"):
+ getcreator = re.compile("nco:fullname\ \"([\w\ ]+)\"")
+ creator_match = getcreator.search(line)
if (creator_match):
- new_line = 'nco:creator:fullname "%s" ;' % (creator_match.group (1))
+ new_line = 'nco:creator:fullname "%s" ;' % (
+ creator_match.group(1))
return [new_line]
else:
print "Something special in this line '%s'" % (line)
- elif line.startswith ("nfo:hasMediaFileListEntry"):
- return self.__handle_playlist_entries (line)
-
+ elif line.startswith("nfo:hasMediaFileListEntry"):
+ return self.__handle_playlist_entries(line)
+
else:
return [line]
@@ -225,14 +229,15 @@ class ExtractorParser(object):
-> nfo:hMFLE:entryUrl '...'
...
"""
- geturl = re.compile ("nfo:entryUrl \"([\w\.\:\/]+)\"")
- entries = line.strip () [len ("nfo:hasMediaFileListEntry"):]
+ geturl = re.compile("nfo:entryUrl \"([\w\.\:\/]+)\"")
+ entries = line.strip()[len("nfo:hasMediaFileListEntry"):]
results = []
- for entry in entries.split (","):
- url_match = geturl.search (entry)
+ for entry in entries.split(","):
+ url_match = geturl.search(entry)
if (url_match):
- new_line = 'nfo:hasMediaFileListEntry:entryUrl "%s" ;' % (url_match.group (1))
- results.append (new_line)
+ new_line = 'nfo:hasMediaFileListEntry:entryUrl "%s" ;' % (
+ url_match.group(1))
+ results.append(new_line)
else:
print " *** Something special in this line '%s'" % (entry)
return results
@@ -241,16 +246,16 @@ class ExtractorParser(object):
"""
the value comes with a ';' or a '.' at the end
"""
- if (len (value) < 2):
- return value.strip ()
-
- clean = value.strip ()
+ if (len(value) < 2):
+ return value.strip()
+
+ clean = value.strip()
if value[-1] in [';', '.']:
- clean = value [:-1]
+ clean = value[:-1]
+
+ clean = clean.replace("\"", "")
- clean = clean.replace ("\"", "")
-
- return clean.strip ()
+ return clean.strip()
def get_tracker_extract_output(filename, mime_type=None):
@@ -258,14 +263,14 @@ def get_tracker_extract_output(filename, mime_type=None):
Runs `tracker-extract --file` to extract metadata from a file.
"""
- tracker_extract = os.path.join (cfg.EXEC_PREFIX, 'tracker-extract')
+ tracker_extract = os.path.join(cfg.EXEC_PREFIX, 'tracker-extract')
command = [tracker_extract, '--file', filename]
if mime_type is not None:
command.extend(['--mime', mime_type])
try:
- log ('Running: %s' % ' '.join(command))
- output = subprocess.check_output (command)
+ log('Running: %s' % ' '.join(command))
+ output = subprocess.check_output(command)
except subprocess.CalledProcessError as e:
raise Exception("Error %i from tracker-extract, output: %s" %
(e.returncode, e.output))
diff --git a/tests/functional-tests/common/utils/helpers.py b/tests/functional-tests/common/utils/helpers.py
index b34c3d728..cf0d3ee16 100644
--- a/tests/functional-tests/common/utils/helpers.py
+++ b/tests/functional-tests/common/utils/helpers.py
@@ -30,16 +30,20 @@ import re
import configuration as cfg
import options
+
class NoMetadataException (Exception):
pass
REASONABLE_TIMEOUT = 30
-def log (message):
- if options.is_verbose ():
+
+def log(message):
+ if options.is_verbose():
print (message)
+
class Helper:
+
"""
Abstract helper for Tracker processes. Launches the process manually
and waits for it to appear on the session bus.
@@ -58,7 +62,7 @@ class Helper:
BUS_NAME = None
PROCESS_NAME = None
- def __init__ (self):
+ def __init__(self):
self.loop = None
self.bus = None
self.bus_admin = None
@@ -68,63 +72,66 @@ class Helper:
Handler to abort test if an exception occurs inside the GLib main loop.
"""
old_hook = sys.excepthook
+
def new_hook(etype, evalue, etb):
old_hook(etype, evalue, etb)
GLib.MainLoop.quit(loop)
sys.exit()
sys.excepthook = new_hook
- def _get_bus (self):
+ def _get_bus(self):
if self.bus is not None:
return
- self.loop = GObject.MainLoop ()
+ self.loop = GObject.MainLoop()
self.install_glib_excepthook(self.loop)
- dbus_loop = DBusGMainLoop (set_as_default=True)
- self.bus = dbus.SessionBus (dbus_loop)
+ dbus_loop = DBusGMainLoop(set_as_default=True)
+ self.bus = dbus.SessionBus(dbus_loop)
- obj = self.bus.get_object ("org.freedesktop.DBus",
- "/org/freedesktop/DBus")
- self.bus_admin = dbus.Interface (obj, dbus_interface = "org.freedesktop.DBus")
+ obj = self.bus.get_object("org.freedesktop.DBus",
+ "/org/freedesktop/DBus")
+ self.bus_admin = dbus.Interface(
+ obj, dbus_interface="org.freedesktop.DBus")
- def _start_process (self):
- path = getattr (self,
- "PROCESS_PATH",
- os.path.join (cfg.EXEC_PREFIX, self.PROCESS_NAME))
- flags = getattr (self,
- "FLAGS",
- [])
+ def _start_process(self):
+ path = getattr(self,
+ "PROCESS_PATH",
+ os.path.join(cfg.EXEC_PREFIX, self.PROCESS_NAME))
+ flags = getattr(self,
+ "FLAGS",
+ [])
- if options.is_manual_start ():
+ if options.is_manual_start():
print ("Start %s manually" % self.PROCESS_NAME)
else:
kws = {}
- if not options.is_verbose ():
- FNULL = open ('/dev/null', 'w')
- kws = { 'stdout': FNULL, 'stderr': FNULL }
+ if not options.is_verbose():
+ FNULL = open('/dev/null', 'w')
+ kws = {'stdout': FNULL, 'stderr': FNULL}
command = [path] + flags
- log ("Starting %s" % ' '.join(command))
- return subprocess.Popen ([path] + flags, **kws)
+ log("Starting %s" % ' '.join(command))
+ return subprocess.Popen([path] + flags, **kws)
- def _name_owner_changed_cb (self, name, old_owner, new_owner):
+ def _name_owner_changed_cb(self, name, old_owner, new_owner):
if name == self.BUS_NAME:
if old_owner == '' and new_owner != '':
- log ("[%s] appeared in the bus" % self.PROCESS_NAME)
+ log("[%s] appeared in the bus" % self.PROCESS_NAME)
self.available = True
- elif old_owner != '' and new_owner == '':
- log ("[%s] disappeared from the bus" % self.PROCESS_NAME)
+ elif old_owner != '' and new_owner == '':
+ log("[%s] disappeared from the bus" % self.PROCESS_NAME)
self.available = False
else:
- log ("[%s] name change %s -> %s" % (self.PROCESS_NAME, old_owner, new_owner))
+ log("[%s] name change %s -> %s" %
+ (self.PROCESS_NAME, old_owner, new_owner))
- self.loop.quit ()
+ self.loop.quit()
- def _process_watch_cb (self):
- status = self.process.poll ()
+ def _process_watch_cb(self):
+ status = self.process.poll()
if status is None:
return True
@@ -132,46 +139,50 @@ class Helper:
if status == 0 and not self.abort_if_process_exits_with_status_0:
return True
- raise Exception("%s exited with status: %i" % (self.PROCESS_NAME, status))
+ raise Exception("%s exited with status: %i" %
+ (self.PROCESS_NAME, status))
- def _timeout_on_idle_cb (self):
- log ("[%s] Timeout waiting... asumming idle." % self.PROCESS_NAME)
- self.loop.quit ()
+ def _timeout_on_idle_cb(self):
+ log("[%s] Timeout waiting... asumming idle." % self.PROCESS_NAME)
+ self.loop.quit()
self.timeout_id = None
return False
-
- def start (self):
+ def start(self):
"""
Start an instance of process and wait for it to appear on the bus.
"""
- self._get_bus ()
+ self._get_bus()
- if (self.bus_admin.NameHasOwner (self.BUS_NAME)):
- raise Exception ("Unable to start test instance of %s: already running" % self.PROCESS_NAME)
+ if (self.bus_admin.NameHasOwner(self.BUS_NAME)):
+ raise Exception(
+ "Unable to start test instance of %s: already running" % self.PROCESS_NAME)
- self.name_owner_match = self.bus.add_signal_receiver (self._name_owner_changed_cb,
+ self.name_owner_match = self.bus.add_signal_receiver(
+ self._name_owner_changed_cb,
signal_name="NameOwnerChanged",
path="/org/freedesktop/DBus",
dbus_interface="org.freedesktop.DBus")
- self.process = self._start_process ()
- log ('[%s] Started process %i' % (self.PROCESS_NAME, self.process.pid))
+ self.process = self._start_process()
+ log('[%s] Started process %i' % (self.PROCESS_NAME, self.process.pid))
- self.process_watch_timeout = GLib.timeout_add (200, self._process_watch_cb)
+ self.process_watch_timeout = GLib.timeout_add(
+ 200, self._process_watch_cb)
self.abort_if_process_exits_with_status_0 = True
# Run the loop until the bus name appears, or the process dies.
- self.loop.run ()
+ self.loop.run()
self.abort_if_process_exits_with_status_0 = False
- def stop (self):
+ def stop(self):
start = time.time()
if self.process.poll() == None:
- # It should step out of this loop when the miner disappear from the bus
+ # It should step out of this loop when the miner disappear from the
+ # bus
GLib.source_remove(self.process_watch_timeout)
self.process.terminate()
@@ -180,25 +191,27 @@ class Helper:
time.sleep(0.1)
if time.time() > (start + REASONABLE_TIMEOUT):
- log ("[%s] Failed to terminate, sending kill!" % self.PROCESS_NAME)
+ log("[%s] Failed to terminate, sending kill!" %
+ self.PROCESS_NAME)
self.process.kill()
self.process.wait()
- log ("[%s] stopped." % self.PROCESS_NAME)
+ log("[%s] stopped." % self.PROCESS_NAME)
# Disconnect the signals of the next start we get duplicated messages
- self.bus._clean_up_signal_match (self.name_owner_match)
+ self.bus._clean_up_signal_match(self.name_owner_match)
- def kill (self):
- self.process.kill ()
+ def kill(self):
+ self.process.kill()
# Name owner changed callback should take us out from this loop
- self.loop.run ()
+ self.loop.run()
- log ("[%s] killed." % self.PROCESS_NAME)
- self.bus._clean_up_signal_match (self.name_owner_match)
+ log("[%s] killed." % self.PROCESS_NAME)
+ self.bus._clean_up_signal_match(self.name_owner_match)
class StoreHelper (Helper):
+
"""
Wrapper for the Store API
@@ -211,40 +224,46 @@ class StoreHelper (Helper):
graph_updated_handler_id = 0
- def start (self):
- Helper.start (self)
+ def start(self):
+ Helper.start(self)
- tracker = self.bus.get_object (cfg.TRACKER_BUSNAME,
- cfg.TRACKER_OBJ_PATH)
+ tracker = self.bus.get_object(cfg.TRACKER_BUSNAME,
+ cfg.TRACKER_OBJ_PATH)
- self.resources = dbus.Interface (tracker,
- dbus_interface=cfg.RESOURCES_IFACE)
+ self.resources = dbus.Interface(tracker,
+ dbus_interface=cfg.RESOURCES_IFACE)
- tracker_backup = self.bus.get_object (cfg.TRACKER_BUSNAME, cfg.TRACKER_BACKUP_OBJ_PATH)
- self.backup_iface = dbus.Interface (tracker_backup, dbus_interface=cfg.BACKUP_IFACE)
+ tracker_backup = self.bus.get_object(
+ cfg.TRACKER_BUSNAME, cfg.TRACKER_BACKUP_OBJ_PATH)
+ self.backup_iface = dbus.Interface(
+ tracker_backup, dbus_interface=cfg.BACKUP_IFACE)
- tracker_stats = self.bus.get_object (cfg.TRACKER_BUSNAME, cfg.TRACKER_STATS_OBJ_PATH)
+ tracker_stats = self.bus.get_object(
+ cfg.TRACKER_BUSNAME, cfg.TRACKER_STATS_OBJ_PATH)
- self.stats_iface = dbus.Interface (tracker_stats, dbus_interface=cfg.STATS_IFACE)
+ self.stats_iface = dbus.Interface(
+ tracker_stats, dbus_interface=cfg.STATS_IFACE)
- tracker_status = self.bus.get_object (cfg.TRACKER_BUSNAME,
- cfg.TRACKER_STATUS_OBJ_PATH)
- self.status_iface = dbus.Interface (tracker_status, dbus_interface=cfg.STATUS_IFACE)
+ tracker_status = self.bus.get_object(cfg.TRACKER_BUSNAME,
+ cfg.TRACKER_STATUS_OBJ_PATH)
+ self.status_iface = dbus.Interface(
+ tracker_status, dbus_interface=cfg.STATUS_IFACE)
- log ("[%s] booting..." % self.PROCESS_NAME)
- self.status_iface.Wait ()
- log ("[%s] ready." % self.PROCESS_NAME)
+ log("[%s] booting..." % self.PROCESS_NAME)
+ self.status_iface.Wait()
+ log("[%s] ready." % self.PROCESS_NAME)
- self.reset_graph_updates_tracking ()
- self.graph_updated_handler_id = self.bus.add_signal_receiver (self._graph_updated_cb,
- signal_name = "GraphUpdated",
- path = cfg.TRACKER_OBJ_PATH,
- dbus_interface = cfg.RESOURCES_IFACE)
+ self.reset_graph_updates_tracking()
+ self.graph_updated_handler_id = self.bus.add_signal_receiver(
+ self._graph_updated_cb,
+ signal_name="GraphUpdated",
+ path=cfg.TRACKER_OBJ_PATH,
+ dbus_interface=cfg.RESOURCES_IFACE)
- def stop (self):
- Helper.stop (self)
+ def stop(self):
+ Helper.stop(self)
- self.bus._clean_up_signal_match (self.graph_updated_handler_id)
+ self.bus._clean_up_signal_match(self.graph_updated_handler_id)
# A system to follow GraphUpdated and make sure all changes are tracked.
# This code saves every change notification received, and exposes methods
@@ -252,20 +271,20 @@ class StoreHelper (Helper):
# the list of events already received and wait for more if the event has
# not yet happened.
- def reset_graph_updates_tracking (self):
+ def reset_graph_updates_tracking(self):
self.inserts_list = []
self.deletes_list = []
self.inserts_match_function = None
self.deletes_match_function = None
self.graph_updated_timed_out = False
- def _graph_updated_timeout_cb (self):
+ def _graph_updated_timeout_cb(self):
# Don't fail here, exceptions don't get propagated correctly
# from the GMainLoop
self.graph_updated_timed_out = True
- self.loop.quit ()
+ self.loop.quit()
- def _graph_updated_cb (self, class_name, deletes_list, inserts_list):
+ def _graph_updated_cb(self, class_name, deletes_list, inserts_list):
"""
Process notifications from tracker-store on resource changes.
"""
@@ -274,24 +293,27 @@ class StoreHelper (Helper):
if inserts_list is not None:
if self.inserts_match_function is not None:
# The match function will remove matched entries from the list
- (exit_loop, inserts_list) = self.inserts_match_function (inserts_list)
+ (exit_loop, inserts_list) = self.inserts_match_function(
+ inserts_list)
self.inserts_list += inserts_list
if deletes_list is not None:
if self.deletes_match_function is not None:
- (exit_loop, deletes_list) = self.deletes_match_function (deletes_list)
+ (exit_loop, deletes_list) = self.deletes_match_function(
+ deletes_list)
self.deletes_list += deletes_list
if exit_loop:
GLib.source_remove(self.graph_updated_timeout_id)
self.graph_updated_timeout_id = 0
- self.loop.quit ()
+ self.loop.quit()
- def _enable_await_timeout (self):
- self.graph_updated_timeout_id = GLib.timeout_add_seconds (REASONABLE_TIMEOUT,
+ def _enable_await_timeout(self):
+ self.graph_updated_timeout_id = GLib.timeout_add_seconds(
+ REASONABLE_TIMEOUT,
self._graph_updated_timeout_cb)
- def await_resource_inserted (self, rdf_class, url = None, title = None, required_property = None):
+ def await_resource_inserted(self, rdf_class, url=None, title=None, required_property=None):
"""
Block until a resource matching the parameters becomes available
"""
@@ -300,14 +322,18 @@ class StoreHelper (Helper):
self.matched_resource_urn = None
self.matched_resource_id = None
- log ("Await new %s (%i existing inserts)" % (rdf_class, len (self.inserts_list)))
+ log("Await new %s (%i existing inserts)" %
+ (rdf_class, len(self.inserts_list)))
if required_property is not None:
- required_property_id = self.get_resource_id_by_uri(required_property)
- log ("Required property %s id %i" % (required_property, required_property_id))
+ required_property_id = self.get_resource_id_by_uri(
+ required_property)
+ log("Required property %s id %i" %
+ (required_property, required_property_id))
+
+ known_subjects = set()
- known_subjects = set ()
- def find_resource_insertion (inserts_list):
+ def find_resource_insertion(inserts_list):
matched_creation = (self.matched_resource_id is not None)
matched_required_property = False
remaining_events = []
@@ -319,7 +345,7 @@ class StoreHelper (Helper):
id = insert[1]
if not matched_creation and id not in known_subjects:
- known_subjects.add (id)
+ known_subjects.add(id)
where = " ?urn a %s " % rdf_class
@@ -329,24 +355,26 @@ class StoreHelper (Helper):
if title is not None:
where += "; nie:title \"%s\"" % title
- query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1])
- result_set = self.query (query)
+ query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (
+ where, insert[1])
+ result_set = self.query(query)
- if len (result_set) > 0:
+ if len(result_set) > 0:
matched_creation = True
self.matched_resource_urn = result_set[0][0]
self.matched_resource_id = insert[1]
- log ("Matched creation of resource %s (%i)" %
- (self.matched_resource_urn,
- self.matched_resource_id))
+ log("Matched creation of resource %s (%i)" %
+ (self.matched_resource_urn,
+ self.matched_resource_id))
if required_property is not None:
- log ("Waiting for property %s (%i) to be set" %
- (required_property, required_property_id))
+ log("Waiting for property %s (%i) to be set" %
+ (required_property, required_property_id))
if required_property is not None and matched_creation and not matched_required_property:
if id == self.matched_resource_id and insert[2] == required_property_id:
matched_required_property = True
- log ("Matched %s %s" % (self.matched_resource_urn, required_property))
+ log("Matched %s %s" %
+ (self.matched_resource_urn, required_property))
if not matched_creation or id != self.matched_resource_id:
remaining_events += [insert]
@@ -354,34 +382,37 @@ class StoreHelper (Helper):
matched = matched_creation if required_property is None else matched_required_property
return matched, remaining_events
- def match_cb (inserts_list):
- matched, remaining_events = find_resource_insertion (inserts_list)
+ def match_cb(inserts_list):
+ matched, remaining_events = find_resource_insertion(inserts_list)
exit_loop = matched
return exit_loop, remaining_events
# Check the list of previously received events for matches
- (existing_match, self.inserts_list) = find_resource_insertion (self.inserts_list)
+ (existing_match, self.inserts_list) = find_resource_insertion(
+ self.inserts_list)
if not existing_match:
- self._enable_await_timeout ()
+ self._enable_await_timeout()
self.inserts_match_function = match_cb
# Run the event loop until the correct notification arrives
- self.loop.run ()
+ self.loop.run()
self.inserts_match_function = None
if self.graph_updated_timed_out:
- raise Exception ("Timeout waiting for resource: class %s, URL %s, title %s" % (rdf_class, url, title))
+ raise Exception(
+ "Timeout waiting for resource: class %s, URL %s, title %s" % (rdf_class, url, title))
return (self.matched_resource_id, self.matched_resource_urn)
- def await_resource_deleted (self, id, fail_message = None):
+ def await_resource_deleted(self, id, fail_message=None):
"""
Block until we are notified of a resources deletion
"""
assert (self.deletes_match_function == None)
- def find_resource_deletion (deletes_list):
- log ("find_resource_deletion: looking for %i in %s" % (id, deletes_list))
+ def find_resource_deletion(deletes_list):
+ log("find_resource_deletion: looking for %i in %s" %
+ (id, deletes_list))
matched = False
remaining_events = []
@@ -394,31 +425,33 @@ class StoreHelper (Helper):
return matched, remaining_events
- def match_cb (deletes_list):
+ def match_cb(deletes_list):
matched, remaining_events = find_resource_deletion(deletes_list)
exit_loop = matched
return exit_loop, remaining_events
- log ("Await deletion of %i (%i existing)" % (id, len (self.deletes_list)))
+ log("Await deletion of %i (%i existing)" %
+ (id, len(self.deletes_list)))
- (existing_match, self.deletes_list) = find_resource_deletion (self.deletes_list)
+ (existing_match, self.deletes_list) = find_resource_deletion(
+ self.deletes_list)
if not existing_match:
- self._enable_await_timeout ()
+ self._enable_await_timeout()
self.deletes_match_function = match_cb
# Run the event loop until the correct notification arrives
- self.loop.run ()
+ self.loop.run()
self.deletes_match_function = None
if self.graph_updated_timed_out:
if fail_message is not None:
- raise Exception (fail_message)
+ raise Exception(fail_message)
else:
- raise Exception ("Resource %i has not been deleted." % id)
+ raise Exception("Resource %i has not been deleted." % id)
return
- def await_property_changed (self, subject_id, property_uri):
+ def await_property_changed(self, subject_id, property_uri):
"""
Block until a property of a resource is updated or inserted.
"""
@@ -426,7 +459,7 @@ class StoreHelper (Helper):
property_id = self.get_resource_id_by_uri(property_uri)
- def find_property_change (inserts_list):
+ def find_property_change(inserts_list):
matched = False
remaining_events = []
@@ -439,103 +472,103 @@ class StoreHelper (Helper):
return matched, remaining_events
- def match_cb (inserts_list):
- matched, remaining_events = find_property_change (inserts_list)
+ def match_cb(inserts_list):
+ matched, remaining_events = find_property_change(inserts_list)
exit_loop = matched
return exit_loop, remaining_events
# Check the list of previously received events for matches
- (existing_match, self.inserts_list) = find_property_change (self.inserts_list)
+ (existing_match, self.inserts_list) = find_property_change(
+ self.inserts_list)
if not existing_match:
- self._enable_await_timeout ()
+ self._enable_await_timeout()
self.inserts_match_function = match_cb
# Run the event loop until the correct notification arrives
- self.loop.run ()
+ self.loop.run()
self.inserts_match_function = None
if self.graph_updated_timed_out:
- raise Exception ("Timeout waiting for property change, subject %i "
- "property %s" % (subject_id, property_uri))
+ raise Exception("Timeout waiting for property change, subject %i "
+ "property %s" % (subject_id, property_uri))
- def query (self, query, timeout=5000):
+ def query(self, query, timeout=5000):
try:
- return self.resources.SparqlQuery (query, timeout=timeout)
+ return self.resources.SparqlQuery(query, timeout=timeout)
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- return self.resources.SparqlQuery (query, timeout=timeout)
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ return self.resources.SparqlQuery(query, timeout=timeout)
raise (e)
- def update (self, update_sparql, timeout=5000):
+ def update(self, update_sparql, timeout=5000):
try:
- return self.resources.SparqlUpdate (update_sparql, timeout=timeout)
+ return self.resources.SparqlUpdate(update_sparql, timeout=timeout)
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- return self.resources.SparqlUpdate (update_sparql, timeout=timeout)
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ return self.resources.SparqlUpdate(update_sparql, timeout=timeout)
raise (e)
- def batch_update (self, update_sparql):
+ def batch_update(self, update_sparql):
try:
- return self.resources.BatchSparqlUpdate (update_sparql)
+ return self.resources.BatchSparqlUpdate(update_sparql)
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- return self.resources.BatchSparqlUpdate (update_sparql)
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ return self.resources.BatchSparqlUpdate(update_sparql)
raise (e)
- def batch_commit (self):
- return self.resources.BatchCommit ()
+ def batch_commit(self):
+ return self.resources.BatchCommit()
- def backup (self, backup_file):
+ def backup(self, backup_file):
try:
- self.backup_iface.Save (backup_file)
+ self.backup_iface.Save(backup_file)
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- return self.backup_iface.Save (backup_file)
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ return self.backup_iface.Save(backup_file)
raise (e)
-
- def restore (self, backup_file):
+
+ def restore(self, backup_file):
try:
- return self.backup_iface.Restore (backup_file)
+ return self.backup_iface.Restore(backup_file)
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- return self.backup_iface.Restore (backup_file)
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ return self.backup_iface.Restore(backup_file)
raise (e)
- def get_stats (self):
+ def get_stats(self):
try:
- return self.stats_iface.Get ()
+ return self.stats_iface.Get()
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- return self.stats_iface.Get ()
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ return self.stats_iface.Get()
raise (e)
-
- def get_tracker_iface (self):
+ def get_tracker_iface(self):
return self.resources
- def count_instances (self, ontology_class):
+ def count_instances(self, ontology_class):
QUERY = """
SELECT COUNT(?u) WHERE {
?u a %s .
}
"""
try:
- result = self.resources.SparqlQuery (QUERY % (ontology_class))
+ result = self.resources.SparqlQuery(QUERY % (ontology_class))
except dbus.DBusException as (e):
- if (e.get_dbus_name().startswith ("org.freedesktop.DBus")):
- self.start ()
- result = self.resources.SparqlQuery (QUERY % (ontology_class))
+ if (e.get_dbus_name().startswith("org.freedesktop.DBus")):
+ self.start()
+ result = self.resources.SparqlQuery(QUERY % (ontology_class))
else:
raise (e)
-
- if (len (result) == 1):
- return int (result [0][0])
+
+ if (len(result) == 1):
+ return int(result[0][0])
else:
return -1
@@ -546,11 +579,11 @@ class StoreHelper (Helper):
result = self.query(
'SELECT tracker:id(%s) WHERE { }' % uri)
if len(result) == 1:
- return int (result [0][0])
+ return int(result[0][0])
elif len(result) == 0:
- raise Exception ("No entry for resource %s" % uri)
+ raise Exception("No entry for resource %s" % uri)
else:
- raise Exception ("Multiple entries for resource %s" % uri)
+ raise Exception("Multiple entries for resource %s" % uri)
# FIXME: rename to get_resource_id_by_nepomuk_url !!
def get_resource_id(self, url):
@@ -560,44 +593,44 @@ class StoreHelper (Helper):
result = self.query(
'SELECT tracker:id(?r) WHERE { ?r nie:url "%s" }' % url)
if len(result) == 1:
- return int (result [0][0])
+ return int(result[0][0])
elif len(result) == 0:
- raise Exception ("No entry for resource %s" % url)
+ raise Exception("No entry for resource %s" % url)
else:
- raise Exception ("Multiple entries for resource %s" % url)
+ raise Exception("Multiple entries for resource %s" % url)
- def ask (self, ask_query):
- assert ask_query.strip ().startswith ("ASK")
- result = self.query (ask_query)
- assert len (result) == 1
+ def ask(self, ask_query):
+ assert ask_query.strip().startswith("ASK")
+ result = self.query(ask_query)
+ assert len(result) == 1
if result[0][0] == "true":
return True
elif result[0][0] == "false":
return False
else:
- raise Exception ("Something fishy is going on")
+ raise Exception("Something fishy is going on")
class MinerFsHelper (Helper):
PROCESS_NAME = 'tracker-miner-fs'
- PROCESS_PATH = os.path.join (cfg.EXEC_PREFIX, "tracker-miner-fs")
+ PROCESS_PATH = os.path.join(cfg.EXEC_PREFIX, "tracker-miner-fs")
BUS_NAME = cfg.MINERFS_BUSNAME
FLAGS = ['--initial-sleep=0']
if cfg.haveMaemo:
- FLAGS.append ('--disable-miner=userguides')
+ FLAGS.append('--disable-miner=userguides')
- def start (self):
- Helper.start (self)
+ def start(self):
+ Helper.start(self)
- bus_object = self.bus.get_object (cfg.MINERFS_BUSNAME,
- cfg.MINERFS_OBJ_PATH)
- self.miner_fs = dbus.Interface (bus_object,
- dbus_interface = cfg.MINER_IFACE)
+ bus_object = self.bus.get_object(cfg.MINERFS_BUSNAME,
+ cfg.MINERFS_OBJ_PATH)
+ self.miner_fs = dbus.Interface(bus_object,
+ dbus_interface=cfg.MINER_IFACE)
- def stop (self):
- Helper.stop (self)
+ def stop(self):
+ Helper.stop(self)
class ExtractorHelper (Helper):
@@ -605,8 +638,9 @@ class ExtractorHelper (Helper):
PROCESS_NAME = 'tracker-extract'
BUS_NAME = cfg.TRACKER_EXTRACT_BUSNAME
+
class WritebackHelper (Helper):
PROCESS_NAME = 'tracker-writeback'
- PROCESS_PATH = os.path.join (cfg.EXEC_PREFIX, 'tracker-writeback')
+ PROCESS_PATH = os.path.join(cfg.EXEC_PREFIX, 'tracker-writeback')
BUS_NAME = cfg.WRITEBACK_BUSNAME
diff --git a/tests/functional-tests/common/utils/html.py b/tests/functional-tests/common/utils/html.py
index ec296fe4d..d13e6d388 100644
--- a/tests/functional-tests/common/utils/html.py
+++ b/tests/functional-tests/common/utils/html.py
@@ -2,63 +2,63 @@
import unittest
import os
+
class html:
-
- def top(self):
- os.remove('indexing-performance')
- self.file = 'indexing-performance'
- self.f = open(self.file, "a")
- self.f.write('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">' + "\n" +
- '<html>' + "\n" +
- ' <head>' + "\n" +
- ' <meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">' + "\n" +
- ' <title>Tracker Indexing Performance</title>' + "\n" +
- ' <H1 align="center"><font color="#CC0000" face="verdana" size="6">Tracker Indexing Performance</font></H1>' + "\n" +
- ' <body>' + "\n" +
- ' <table border="1", align="center">' + "\n" +
- '<th><font color="#8000FF" face="verdana" size="4">Test data</font></th>' + "\n" +
- '<th><font color="#8000FF" face="verdana" size="4">Minimum</font></th>' + "\n" +
- '<th><font color="#8000FF" face="verdana" size="4">Maximum</font></th>' + "\n" +
- '<th><font color="#8000FF" face="verdana" size="4">Average</font></th>' + "\n" +
- '<th><font color="#8000FF" face="verdana" size="4">Median</font></th>' + "\n"
- )
- self.f.close()
-
-
- def mid(self,title,min,max,avg,median):
+ def top(self):
- self.file = 'indexing-performance'
- self.f = open(self.file, "a")
- self.f.write( '<tr>' + "\n" +
- '<td>' + title + '</td>' + "\n" +
- '<td>' + str(min) + '</td>' + "\n" +
- '<td>' + str(max) + '</td>' + "\n" +
- '<td>' + str(avg) + '</td>' + "\n" +
- '<td>' + str(median) + '</td>' + "\n" +
- '</tr>' + "\n"
- )
- self.f.close()
+ os.remove('indexing-performance')
+ self.file = 'indexing-performance'
+ self.f = open(self.file, "a")
+ self.f.write('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">' + "\n" +
+ '<html>' + "\n" +
+ ' <head>' + "\n" +
+ ' <meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">' + "\n" +
+ ' <title>Tracker Indexing Performance</title>' + "\n" +
+ ' <H1 align="center"><font color="#CC0000" face="verdana" size="6">Tracker Indexing Performance</font></H1>' + "\n" +
+ ' <body>' + "\n" +
+ ' <table border="1", align="center">' + "\n" +
+ '<th><font color="#8000FF" face="verdana" size="4">Test data</font></th>' + "\n" +
+ '<th><font color="#8000FF" face="verdana" size="4">Minimum</font></th>' + "\n" +
+ '<th><font color="#8000FF" face="verdana" size="4">Maximum</font></th>' + "\n" +
+ '<th><font color="#8000FF" face="verdana" size="4">Average</font></th>' + "\n" +
+ '<th><font color="#8000FF" face="verdana" size="4">Median</font></th>' +
+ "\n"
+ )
+ self.f.close()
- def bottom(self):
+ def mid(self, title, min, max, avg, median):
- self.file = 'indexing-performance'
- self.f = open(self.file, "a")
- self.f.write( '</table>' + "\n" +
- ' </body>' + "\n" +
- ' </head>' + "\n" +
- ' </html>' + "\n"
- )
- self.f.close()
+ self.file = 'indexing-performance'
+ self.f = open(self.file, "a")
+ self.f.write('<tr>' + "\n" +
+ '<td>' + title + '</td>' + "\n" +
+ '<td>' + str(min) + '</td>' + "\n" +
+ '<td>' + str(max) + '</td>' + "\n" +
+ '<td>' + str(avg) + '</td>' + "\n" +
+ '<td>' + str(median) + '</td>' + "\n" +
+ '</tr>' + "\n"
+ )
+ self.f.close()
-class report(unittest.TestCase):
+ def bottom(self):
+
+ self.file = 'indexing-performance'
+ self.f = open(self.file, "a")
+ self.f.write('</table>' + "\n" +
+ ' </body>' + "\n" +
+ ' </head>' + "\n" +
+ ' </html>' + "\n"
+ )
+ self.f.close()
- def first(self):
- self.file = html()
- self.file.top()
-
- def last(self):
- self.file = html()
- self.file.bottom()
+class report(unittest.TestCase):
+
+ def first(self):
+ self.file = html()
+ self.file.top()
+ def last(self):
+ self.file = html()
+ self.file.bottom()
diff --git a/tests/functional-tests/common/utils/minertest.py b/tests/functional-tests/common/utils/minertest.py
index c84270e02..fc927b8f6 100644
--- a/tests/functional-tests/common/utils/minertest.py
+++ b/tests/functional-tests/common/utils/minertest.py
@@ -30,16 +30,18 @@ from itertools import chain
MINER_TMP_DIR = cfg.TEST_MONITORED_TMP_DIR
-def path (filename):
- return os.path.join (MINER_TMP_DIR, filename)
-def uri (filename):
- return "file://" + os.path.join (MINER_TMP_DIR, filename)
+def path(filename):
+ return os.path.join(MINER_TMP_DIR, filename)
+
+
+def uri(filename):
+ return "file://" + os.path.join(MINER_TMP_DIR, filename)
DEFAULT_TEXT = "Some stupid content, to have a test file"
-index_dirs = [os.path.join (MINER_TMP_DIR, "test-monitored")]
+index_dirs = [os.path.join(MINER_TMP_DIR, "test-monitored")]
CONF_OPTIONS = {
cfg.DCONF_MINER_SCHEMA: {
'index-recursive-directories': GLib.Variant.new_strv(index_dirs),
@@ -53,7 +55,7 @@ CONF_OPTIONS = {
class CommonTrackerMinerTest (ut.TestCase):
- def prepare_directories (self):
+ def prepare_directories(self):
#
# ~/test-monitored/
# /file1.txt
@@ -84,31 +86,31 @@ class CommonTrackerMinerTest (ut.TestCase):
for tf in chain(monitored_files, unmonitored_files):
testfile = path(tf)
ensure_dir_exists(os.path.dirname(testfile))
- with open (testfile, 'w') as f:
- f.write (DEFAULT_TEXT)
+ with open(testfile, 'w') as f:
+ f.write(DEFAULT_TEXT)
for tf in monitored_files:
self.tracker.await_resource_inserted(
'nfo:TextDocument', url=uri(tf))
- def setUp (self):
+ def setUp(self):
for d in ['test-monitored', 'test-no-monitored']:
dirname = path(d)
- if os.path.exists (dirname):
+ if os.path.exists(dirname):
shutil.rmtree(dirname)
os.makedirs(dirname)
- self.system = TrackerSystemAbstraction ()
+ self.system = TrackerSystemAbstraction()
- self.system.tracker_miner_fs_testing_start (CONF_OPTIONS)
+ self.system.tracker_miner_fs_testing_start(CONF_OPTIONS)
self.tracker = self.system.store
try:
- self.prepare_directories ()
- self.tracker.reset_graph_updates_tracking ()
+ self.prepare_directories()
+ self.tracker.reset_graph_updates_tracking()
except Exception as e:
- self.tearDown ()
+ self.tearDown()
raise
- def tearDown (self):
- self.system.tracker_miner_fs_testing_stop ()
+ def tearDown(self):
+ self.system.tracker_miner_fs_testing_stop()
diff --git a/tests/functional-tests/common/utils/options.py b/tests/functional-tests/common/utils/options.py
index 6bc837905..7bbfad9c1 100644
--- a/tests/functional-tests/common/utils/options.py
+++ b/tests/functional-tests/common/utils/options.py
@@ -21,17 +21,19 @@ parser.add_option("-v", "--verbose", dest="verbose",
# have their own simple commandline parsers will complain
for option in ["--startmanually", "-m", "--verbose", "-v"]:
try:
- sys.argv.remove (option)
+ sys.argv.remove(option)
except ValueError:
pass
-def is_verbose ():
+
+def is_verbose():
"""
True to log process status information to stdout
"""
return options.verbose
-def is_manual_start ():
+
+def is_manual_start():
"""
False to start the processes automatically
"""
diff --git a/tests/functional-tests/common/utils/storetest.py b/tests/functional-tests/common/utils/storetest.py
index be16b6caa..3c35e49fb 100644
--- a/tests/functional-tests/common/utils/storetest.py
+++ b/tests/functional-tests/common/utils/storetest.py
@@ -28,18 +28,20 @@ from common.utils import configuration as cfg
import unittest2 as ut
#import unittest as ut
+
class CommonTrackerStoreTest (ut.TestCase):
- """
- Common superclass for tests that just require a fresh store running
- """
- @classmethod
- def setUpClass (self):
- #print "Starting the daemon in test mode"
- self.system = TrackerSystemAbstraction ()
- self.system.tracker_store_testing_start ()
- self.tracker = self.system.store
- @classmethod
- def tearDownClass (self):
- #print "Stopping the daemon in test mode (Doing nothing now)"
- self.system.tracker_store_testing_stop ()
+ """
+ Common superclass for tests that just require a fresh store running
+ """
+ @classmethod
+ def setUpClass(self):
+ # print "Starting the daemon in test mode"
+ self.system = TrackerSystemAbstraction()
+ self.system.tracker_store_testing_start()
+ self.tracker = self.system.store
+
+ @classmethod
+ def tearDownClass(self):
+ # print "Stopping the daemon in test mode (Doing nothing now)"
+ self.system.tracker_store_testing_stop()
diff --git a/tests/functional-tests/common/utils/system.py b/tests/functional-tests/common/utils/system.py
index e305aa57f..539d5de19 100644
--- a/tests/functional-tests/common/utils/system.py
+++ b/tests/functional-tests/common/utils/system.py
@@ -18,26 +18,27 @@ import helpers
# Add this after fixing the backup/restore and ontology changes tests
#"G_DEBUG" : "fatal_criticals",
-TEST_ENV_DIRS = { "XDG_DATA_HOME" : os.path.join (cfg.TEST_TMP_DIR, "data"),
- "XDG_CACHE_HOME": os.path.join (cfg.TEST_TMP_DIR, "cache")}
+TEST_ENV_DIRS = {"XDG_DATA_HOME": os.path.join(cfg.TEST_TMP_DIR, "data"),
+ "XDG_CACHE_HOME": os.path.join(cfg.TEST_TMP_DIR, "cache")}
-TEST_ENV_VARS = { "TRACKER_DISABLE_MEEGOTOUCH_LOCALE": "",
- "LC_COLLATE": "en_GB.utf8",
- "DCONF_PROFILE": os.path.join (cfg.DATADIR, "tracker-tests",
- "trackertest") }
+TEST_ENV_VARS = {"TRACKER_DISABLE_MEEGOTOUCH_LOCALE": "",
+ "LC_COLLATE": "en_GB.utf8",
+ "DCONF_PROFILE": os.path.join(cfg.DATADIR, "tracker-tests",
+ "trackertest")}
-EXTRA_DIRS = [os.path.join (cfg.TEST_TMP_DIR, "data", "tracker"),
- os.path.join (cfg.TEST_TMP_DIR, "cache", "tracker")]
+EXTRA_DIRS = [os.path.join(cfg.TEST_TMP_DIR, "data", "tracker"),
+ os.path.join(cfg.TEST_TMP_DIR, "cache", "tracker")]
REASONABLE_TIMEOUT = 30
+
class UnableToBootException (Exception):
pass
class TrackerSystemAbstraction:
- def set_up_environment (self, settings, ontodir):
+ def set_up_environment(self, settings, ontodir):
"""
Sets up the XDG_*_HOME variables and make sure the directories exist
@@ -47,29 +48,30 @@ class TrackerSystemAbstraction:
GLib.Variant instance.
"""
- helpers.log ("[Conf] Setting test environment...")
+ helpers.log("[Conf] Setting test environment...")
- for var, directory in TEST_ENV_DIRS.iteritems ():
- helpers.log ("export %s=%s" %(var, directory))
- self.__recreate_directory (directory)
- os.environ [var] = directory
+ for var, directory in TEST_ENV_DIRS.iteritems():
+ helpers.log("export %s=%s" % (var, directory))
+ self.__recreate_directory(directory)
+ os.environ[var] = directory
for directory in EXTRA_DIRS:
- self.__recreate_directory (directory)
+ self.__recreate_directory(directory)
if ontodir:
- helpers.log ("export %s=%s" % ("TRACKER_DB_ONTOLOGIES_DIR", ontodir))
- os.environ ["TRACKER_DB_ONTOLOGIES_DIR"] = ontodir
+ helpers.log("export %s=%s" %
+ ("TRACKER_DB_ONTOLOGIES_DIR", ontodir))
+ os.environ["TRACKER_DB_ONTOLOGIES_DIR"] = ontodir
- for var, value in TEST_ENV_VARS.iteritems ():
- helpers.log ("export %s=%s" %(var, value))
- os.environ [var] = value
+ for var, value in TEST_ENV_VARS.iteritems():
+ helpers.log("export %s=%s" % (var, value))
+ os.environ[var] = value
# Previous loop should have set DCONF_PROFILE to the test location
if settings is not None:
self._apply_settings(settings)
- helpers.log ("[Conf] environment ready")
+ helpers.log("[Conf] environment ready")
def _apply_settings(self, settings):
for schema_name, contents in settings.iteritems():
@@ -78,139 +80,148 @@ class TrackerSystemAbstraction:
for key, value in contents.iteritems():
dconf.write(key, value)
- def tracker_store_testing_start (self, confdir=None, ontodir=None):
+ def tracker_store_testing_start(self, confdir=None, ontodir=None):
"""
Stops any previous instance of the store, calls set_up_environment,
and starts a new instances of the store
"""
- self.set_up_environment (confdir, ontodir)
+ self.set_up_environment(confdir, ontodir)
- self.store = helpers.StoreHelper ()
- self.store.start ()
+ self.store = helpers.StoreHelper()
+ self.store.start()
- def tracker_store_start (self):
- self.store.start ()
+ def tracker_store_start(self):
+ self.store.start()
- def tracker_store_stop_nicely (self):
- self.store.stop ()
+ def tracker_store_stop_nicely(self):
+ self.store.stop()
- def tracker_store_stop_brutally (self):
- self.store.kill ()
+ def tracker_store_stop_brutally(self):
+ self.store.kill()
- def tracker_store_restart_with_new_ontologies (self, ontodir):
- self.store.stop ()
+ def tracker_store_restart_with_new_ontologies(self, ontodir):
+ self.store.stop()
if ontodir:
- helpers.log ("[Conf] Setting %s - %s" % ("TRACKER_DB_ONTOLOGIES_DIR", ontodir))
- os.environ ["TRACKER_DB_ONTOLOGIES_DIR"] = ontodir
+ helpers.log("[Conf] Setting %s - %s" %
+ ("TRACKER_DB_ONTOLOGIES_DIR", ontodir))
+ os.environ["TRACKER_DB_ONTOLOGIES_DIR"] = ontodir
try:
- self.store.start ()
+ self.store.start()
except dbus.DBusException, e:
- raise UnableToBootException ("Unable to boot the store \n(" + str(e) + ")")
+ raise UnableToBootException(
+ "Unable to boot the store \n(" + str(e) + ")")
- def tracker_store_prepare_journal_replay (self):
- db_location = os.path.join (TEST_ENV_DIRS ['XDG_CACHE_HOME'], "tracker", "meta.db")
- os.unlink (db_location)
+ def tracker_store_prepare_journal_replay(self):
+ db_location = os.path.join(
+ TEST_ENV_DIRS['XDG_CACHE_HOME'], "tracker", "meta.db")
+ os.unlink(db_location)
- lockfile = os.path.join (TEST_ENV_DIRS ['XDG_DATA_HOME'], "tracker", "data", ".ismeta.running")
- f = open (lockfile, 'w')
- f.write (" ")
- f.close ()
+ lockfile = os.path.join(
+ TEST_ENV_DIRS['XDG_DATA_HOME'], "tracker", "data", ".ismeta.running")
+ f = open(lockfile, 'w')
+ f.write(" ")
+ f.close()
- def tracker_store_corrupt_dbs (self):
+ def tracker_store_corrupt_dbs(self):
for filename in ["meta.db", "meta.db-wal"]:
- db_path = os.path.join (TEST_ENV_DIRS ['XDG_CACHE_HOME'], "tracker", filename)
- f = open (db_path, "w")
- for i in range (0, 100):
- f.write ("Some stupid content... hohohoho, not a sqlite file anymore!\n")
- f.close ()
-
- def tracker_store_remove_journal (self):
- db_location = os.path.join (TEST_ENV_DIRS ['XDG_DATA_HOME'], "tracker", "data")
- shutil.rmtree (db_location)
- os.mkdir (db_location)
-
- def tracker_store_remove_dbs (self):
- db_location = os.path.join (TEST_ENV_DIRS ['XDG_CACHE_HOME'], "tracker")
- shutil.rmtree (db_location)
- os.mkdir (db_location)
-
- def tracker_store_testing_stop (self):
+ db_path = os.path.join(
+ TEST_ENV_DIRS['XDG_CACHE_HOME'], "tracker", filename)
+ f = open(db_path, "w")
+ for i in range(0, 100):
+ f.write(
+ "Some stupid content... hohohoho, not a sqlite file anymore!\n")
+ f.close()
+
+ def tracker_store_remove_journal(self):
+ db_location = os.path.join(
+ TEST_ENV_DIRS['XDG_DATA_HOME'], "tracker", "data")
+ shutil.rmtree(db_location)
+ os.mkdir(db_location)
+
+ def tracker_store_remove_dbs(self):
+ db_location = os.path.join(
+ TEST_ENV_DIRS['XDG_CACHE_HOME'], "tracker")
+ shutil.rmtree(db_location)
+ os.mkdir(db_location)
+
+ def tracker_store_testing_stop(self):
"""
Stops a running tracker-store
"""
assert self.store
- self.store.stop ()
-
+ self.store.stop()
- def tracker_miner_fs_testing_start (self, confdir=None):
+ def tracker_miner_fs_testing_start(self, confdir=None):
"""
Stops any previous instance of the store and miner, calls set_up_environment,
and starts a new instance of the store and miner-fs
"""
- self.set_up_environment (confdir, None)
+ self.set_up_environment(confdir, None)
# Start also the store. DBus autoactivation ignores the env variables.
- self.store = helpers.StoreHelper ()
- self.store.start ()
+ self.store = helpers.StoreHelper()
+ self.store.start()
- self.extractor = helpers.ExtractorHelper ()
- self.extractor.start ()
+ self.extractor = helpers.ExtractorHelper()
+ self.extractor.start()
- self.miner_fs = helpers.MinerFsHelper ()
- self.miner_fs.start ()
+ self.miner_fs = helpers.MinerFsHelper()
+ self.miner_fs.start()
- def tracker_miner_fs_testing_stop (self):
+ def tracker_miner_fs_testing_stop(self):
"""
Stops the extractor, miner-fs and store running
"""
- self.extractor.stop ()
- self.miner_fs.stop ()
- self.store.stop ()
+ self.extractor.stop()
+ self.miner_fs.stop()
+ self.store.stop()
- def tracker_writeback_testing_start (self, confdir=None):
+ def tracker_writeback_testing_start(self, confdir=None):
# Start the miner-fs (and store) and then the writeback process
- self.tracker_miner_fs_testing_start (confdir)
- self.writeback = helpers.WritebackHelper ()
- self.writeback.start ()
+ self.tracker_miner_fs_testing_start(confdir)
+ self.writeback = helpers.WritebackHelper()
+ self.writeback.start()
- def tracker_writeback_testing_stop (self):
+ def tracker_writeback_testing_stop(self):
# Tracker write must have been started before
- self.writeback.stop ()
- self.tracker_miner_fs_testing_stop ()
+ self.writeback.stop()
+ self.tracker_miner_fs_testing_stop()
- def tracker_all_testing_start (self, confdir=None):
+ def tracker_all_testing_start(self, confdir=None):
# This will start all miner-fs, store and writeback
- self.tracker_writeback_testing_start (confdir)
+ self.tracker_writeback_testing_start(confdir)
- def tracker_all_testing_stop (self):
+ def tracker_all_testing_stop(self):
# This will stop all miner-fs, store and writeback
- self.tracker_writeback_testing_stop ()
+ self.tracker_writeback_testing_stop()
- def __recreate_directory (self, directory):
- if (os.path.exists (directory)):
- shutil.rmtree (directory)
- os.makedirs (directory)
+ def __recreate_directory(self, directory):
+ if (os.path.exists(directory)):
+ shutil.rmtree(directory)
+ os.makedirs(directory)
if __name__ == "__main__":
- import gtk, glib, time
+ import gtk
+ import glib
+ import time
- def destroy_the_world (a):
- a.tracker_store_testing_stop ()
+ def destroy_the_world(a):
+ a.tracker_store_testing_stop()
print " stopped"
Gtk.main_quit()
print "-- Starting store --"
- a = TrackerSystemAbstraction ()
- a.tracker_store_testing_start ()
+ a = TrackerSystemAbstraction()
+ a.tracker_store_testing_start()
print " started, waiting 5 sec. to stop it"
- GLib.timeout_add_seconds (5, destroy_the_world, a)
- Gtk.main ()
+ GLib.timeout_add_seconds(5, destroy_the_world, a)
+ Gtk.main()
print "-- Starting miner-fs --"
- b = TrackerMinerFsLifeCycle ()
- b.start ()
+ b = TrackerMinerFsLifeCycle()
+ b.start()
print " started, waiting 3 secs. to stop it"
- time.sleep (3)
- b.stop ()
+ time.sleep(3)
+ b.stop()
print " stopped"
diff --git a/tests/functional-tests/common/utils/writebacktest.py b/tests/functional-tests/common/utils/writebacktest.py
index 63c3ef7b6..dd96c4f6e 100644
--- a/tests/functional-tests/common/utils/writebacktest.py
+++ b/tests/functional-tests/common/utils/writebacktest.py
@@ -32,7 +32,7 @@ TEST_FILE_JPEG = "writeback-test-1.jpeg"
TEST_FILE_TIFF = "writeback-test-2.tif"
TEST_FILE_PNG = "writeback-test-4.png"
-WRITEBACK_TMP_DIR = os.path.join (cfg.TEST_MONITORED_TMP_DIR, "writeback")
+WRITEBACK_TMP_DIR = os.path.join(cfg.TEST_MONITORED_TMP_DIR, "writeback")
index_dirs = [WRITEBACK_TMP_DIR]
CONF_OPTIONS = {
@@ -45,81 +45,83 @@ CONF_OPTIONS = {
}
-def uri (filename):
- return "file://" + os.path.join (WRITEBACK_TMP_DIR, filename)
+def uri(filename):
+ return "file://" + os.path.join(WRITEBACK_TMP_DIR, filename)
+
class CommonTrackerWritebackTest (ut.TestCase):
+
"""
Superclass to share methods. Shouldn't be run by itself.
Start all processes including writeback, miner pointing to WRITEBACK_TMP_DIR
"""
-
+
@classmethod
- def __prepare_directories (self):
- if (os.path.exists (os.getcwd() + "/test-writeback-data")):
+ def __prepare_directories(self):
+ if (os.path.exists(os.getcwd() + "/test-writeback-data")):
# Use local directory if available
datadir = os.getcwd() + "/test-writeback-data"
else:
- datadir = os.path.join (cfg.DATADIR, "tracker-tests",
- "test-writeback-data")
+ datadir = os.path.join(cfg.DATADIR, "tracker-tests",
+ "test-writeback-data")
if not os.path.exists(WRITEBACK_TMP_DIR):
os.makedirs(WRITEBACK_TMP_DIR)
else:
if not os.path.isdir(WRITEBACK_TMP_DIR):
- raise Exception("%s exists already and is not a directory" % WRITEBACK_TMP_DIR)
+ raise Exception(
+ "%s exists already and is not a directory" % WRITEBACK_TMP_DIR)
- for testfile in [TEST_FILE_JPEG, TEST_FILE_PNG,TEST_FILE_TIFF]:
- origin = os.path.join (datadir, testfile)
- log ("Copying %s -> %s" % (origin, WRITEBACK_TMP_DIR))
- shutil.copy (origin, WRITEBACK_TMP_DIR)
+ for testfile in [TEST_FILE_JPEG, TEST_FILE_PNG, TEST_FILE_TIFF]:
+ origin = os.path.join(datadir, testfile)
+ log("Copying %s -> %s" % (origin, WRITEBACK_TMP_DIR))
+ shutil.copy(origin, WRITEBACK_TMP_DIR)
+ @classmethod
+ def setUpClass(self):
+ # print "Starting the daemon in test mode"
+ self.__prepare_directories()
- @classmethod
- def setUpClass (self):
- #print "Starting the daemon in test mode"
- self.__prepare_directories ()
-
- self.system = TrackerSystemAbstraction ()
+ self.system = TrackerSystemAbstraction()
- self.system.tracker_writeback_testing_start (CONF_OPTIONS)
+ self.system.tracker_writeback_testing_start(CONF_OPTIONS)
def await_resource_extraction(url):
# Make sure a resource has been crawled by the FS miner and by
# tracker-extract. The extractor adds nie:contentCreated for
# image resources, so know once this property is set the
# extraction is complete.
- self.system.store.await_resource_inserted('nfo:Image', url=url, required_property='nfo:width')
+ self.system.store.await_resource_inserted(
+ 'nfo:Image', url=url, required_property='nfo:width')
- await_resource_extraction (self.get_test_filename_jpeg())
- await_resource_extraction (self.get_test_filename_tiff())
- await_resource_extraction (self.get_test_filename_png())
+ await_resource_extraction(self.get_test_filename_jpeg())
+ await_resource_extraction(self.get_test_filename_tiff())
+ await_resource_extraction(self.get_test_filename_png())
# Returns when ready
- log ("Ready to go!")
-
+ log("Ready to go!")
+
@classmethod
- def tearDownClass (self):
- #print "Stopping the daemon in test mode (Doing nothing now)"
- self.system.tracker_writeback_testing_stop ()
-
+ def tearDownClass(self):
+ # print "Stopping the daemon in test mode (Doing nothing now)"
+ self.system.tracker_writeback_testing_stop()
@staticmethod
- def get_test_filename_jpeg ():
- return uri (TEST_FILE_JPEG)
+ def get_test_filename_jpeg():
+ return uri(TEST_FILE_JPEG)
@staticmethod
- def get_test_filename_tiff ():
- return uri (TEST_FILE_TIFF)
+ def get_test_filename_tiff():
+ return uri(TEST_FILE_TIFF)
@staticmethod
- def get_test_filename_png ():
- return uri (TEST_FILE_PNG)
+ def get_test_filename_png():
+ return uri(TEST_FILE_PNG)
- def get_mtime (self, filename):
+ def get_mtime(self, filename):
return os.stat(filename).st_mtime
- def wait_for_file_change (self, filename, initial_mtime):
+ def wait_for_file_change(self, filename, initial_mtime):
start = time.time()
while time.time() < start + 5:
mtime = os.stat(filename).st_mtime