summaryrefslogtreecommitdiff
path: root/Lib/bsddb/test/test_lock.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_lock.py')
-rw-r--r--Lib/bsddb/test/test_lock.py179
1 files changed, 0 insertions, 179 deletions
diff --git a/Lib/bsddb/test/test_lock.py b/Lib/bsddb/test/test_lock.py
deleted file mode 100644
index 75c1022c2d..0000000000
--- a/Lib/bsddb/test/test_lock.py
+++ /dev/null
@@ -1,179 +0,0 @@
-"""
-TestCases for testing the locking sub-system.
-"""
-
-import time
-
-import unittest
-from .test_all import db, test_support, verbose, have_threads, \
- get_new_environment_path, get_new_database_path
-
-if have_threads :
- from threading import Thread
- import sys
- if sys.version_info[0] < 3 :
- from threading import currentThread
- else :
- from threading import current_thread as currentThread
-
-#----------------------------------------------------------------------
-
-class LockingTestCase(unittest.TestCase):
- import sys
- if sys.version_info[:3] < (2, 4, 0):
- def assertTrue(self, expr, msg=None):
- self.failUnless(expr,msg=msg)
-
-
- def setUp(self):
- self.homeDir = get_new_environment_path()
- self.env = db.DBEnv()
- self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
- db.DB_INIT_LOCK | db.DB_CREATE)
-
-
- def tearDown(self):
- self.env.close()
- test_support.rmtree(self.homeDir)
-
-
- def test01_simple(self):
- if verbose:
- print('\n', '-=' * 30)
- print("Running %s.test01_simple..." % self.__class__.__name__)
-
- anID = self.env.lock_id()
- if verbose:
- print("locker ID: %s" % anID)
- lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
- if verbose:
- print("Aquired lock: %s" % lock)
- self.env.lock_put(lock)
- if verbose:
- print("Released lock: %s" % lock)
- self.env.lock_id_free(anID)
-
-
- def test02_threaded(self):
- if verbose:
- print('\n', '-=' * 30)
- print("Running %s.test02_threaded..." % self.__class__.__name__)
-
- threads = []
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_WRITE,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_READ,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_READ,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_WRITE,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_READ,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_READ,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_WRITE,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_WRITE,)))
- threads.append(Thread(target = self.theThread,
- args=(db.DB_LOCK_WRITE,)))
-
- for t in threads:
- import sys
- if sys.version_info[0] < 3 :
- t.setDaemon(True)
- else :
- t.daemon = True
- t.start()
- for t in threads:
- t.join()
-
- def test03_lock_timeout(self):
- self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
- self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
- self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
- self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
-
- def deadlock_detection() :
- while not deadlock_detection.end :
- deadlock_detection.count = \
- self.env.lock_detect(db.DB_LOCK_EXPIRE)
- if deadlock_detection.count :
- while not deadlock_detection.end :
- pass
- break
- time.sleep(0.01)
-
- deadlock_detection.end=False
- deadlock_detection.count=0
- t=Thread(target=deadlock_detection)
- import sys
- if sys.version_info[0] < 3 :
- t.setDaemon(True)
- else :
- t.daemon = True
- t.start()
- self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
- anID = self.env.lock_id()
- anID2 = self.env.lock_id()
- self.assertNotEqual(anID, anID2)
- lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
- start_time=time.time()
- self.assertRaises(db.DBLockNotGrantedError,
- self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
- end_time=time.time()
- deadlock_detection.end=True
- self.assertTrue((end_time-start_time) >= 0.1)
- self.env.lock_put(lock)
- t.join()
-
- self.env.lock_id_free(anID)
- self.env.lock_id_free(anID2)
-
- if db.version() >= (4,6):
- self.assertTrue(deadlock_detection.count>0)
-
- def theThread(self, lockType):
- import sys
- if sys.version_info[0] < 3 :
- name = currentThread().getName()
- else :
- name = currentThread().name
-
- if lockType == db.DB_LOCK_WRITE:
- lt = "write"
- else:
- lt = "read"
-
- anID = self.env.lock_id()
- if verbose:
- print("%s: locker ID: %s" % (name, anID))
-
- for i in range(1000) :
- lock = self.env.lock_get(anID, "some locked thing", lockType)
- if verbose:
- print("%s: Aquired %s lock: %s" % (name, lt, lock))
-
- self.env.lock_put(lock)
- if verbose:
- print("%s: Released %s lock: %s" % (name, lt, lock))
-
- self.env.lock_id_free(anID)
-
-
-#----------------------------------------------------------------------
-
-def test_suite():
- suite = unittest.TestSuite()
-
- if have_threads:
- suite.addTest(unittest.makeSuite(LockingTestCase))
- else:
- suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
-
- return suite
-
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')