diff options
Diffstat (limited to 'Lib/bsddb/test/test_early_close.py')
-rw-r--r-- | Lib/bsddb/test/test_early_close.py | 195 |
1 files changed, 0 insertions, 195 deletions
diff --git a/Lib/bsddb/test/test_early_close.py b/Lib/bsddb/test/test_early_close.py deleted file mode 100644 index e944cc6e03..0000000000 --- a/Lib/bsddb/test/test_early_close.py +++ /dev/null @@ -1,195 +0,0 @@ -"""TestCases for checking that it does not segfault when a DBEnv object -is closed before its DB objects. -""" - -import os -import unittest - -from .test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path - -# We're going to get warnings in this module about trying to close the db when -# its env is already closed. Let's just ignore those. -try: - import warnings -except ImportError: - pass -else: - warnings.filterwarnings('ignore', - message='DB could not be closed in', - category=RuntimeWarning) - - -#---------------------------------------------------------------------- - -class DBEnvClosedEarlyCrash(unittest.TestCase): - def setUp(self): - self.homeDir = get_new_environment_path() - self.filename = "test" - - def tearDown(self): - test_support.rmtree(self.homeDir) - - def test01_close_dbenv_before_db(self): - dbenv = db.DBEnv() - dbenv.open(self.homeDir, - db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL, - 0o666) - - d = db.DB(dbenv) - d2 = db.DB(dbenv) - d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - - self.assertRaises(db.DBNoSuchFileError, d2.open, - self.filename+"2", db.DB_BTREE, db.DB_THREAD, 0o666) - - d.put("test","this is a test") - self.assertEqual(d.get("test"), "this is a test", "put!=get") - dbenv.close() # This "close" should close the child db handle also - self.assertRaises(db.DBError, d.get, "test") - - def test02_close_dbenv_before_dbcursor(self): - dbenv = db.DBEnv() - dbenv.open(self.homeDir, - db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL, - 0o666) - - d = db.DB(dbenv) - d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - - d.put("test","this is a test") - d.put("test2","another test") - d.put("test3","another one") - self.assertEqual(d.get("test"), "this is a test", "put!=get") - c=d.cursor() - c.first() - next(c) - d.close() # This "close" should close the child db handle also - # db.close should close the child cursor - self.assertRaises(db.DBError,c.__next__) - - d = db.DB(dbenv) - d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - c=d.cursor() - c.first() - next(c) - dbenv.close() - # The "close" should close the child db handle also, with cursors - self.assertRaises(db.DBError, c.__next__) - - def test03_close_db_before_dbcursor_without_env(self): - import os.path - path=os.path.join(self.homeDir,self.filename) - d = db.DB() - d.open(path, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - - d.put("test","this is a test") - d.put("test2","another test") - d.put("test3","another one") - self.assertEqual(d.get("test"), "this is a test", "put!=get") - c=d.cursor() - c.first() - next(c) - d.close() - # The "close" should close the child db handle also - self.assertRaises(db.DBError, c.__next__) - - def test04_close_massive(self): - dbenv = db.DBEnv() - dbenv.open(self.homeDir, - db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL, - 0o666) - - dbs=[db.DB(dbenv) for i in range(16)] - cursors=[] - for i in dbs : - i.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - - dbs[10].put("test","this is a test") - dbs[10].put("test2","another test") - dbs[10].put("test3","another one") - self.assertEqual(dbs[4].get("test"), "this is a test", "put!=get") - - for i in dbs : - cursors.extend([i.cursor() for j in range(32)]) - - for i in dbs[::3] : - i.close() - for i in cursors[::3] : - i.close() - - # Check for missing exception in DB! (after DB close) - self.assertRaises(db.DBError, dbs[9].get, "test") - - # Check for missing exception in DBCursor! (after DB close) - self.assertRaises(db.DBError, cursors[101].first) - - cursors[80].first() - next(cursors[80]) - dbenv.close() # This "close" should close the child db handle also - # Check for missing exception! (after DBEnv close) - self.assertRaises(db.DBError, cursors[80].__next__) - - def test05_close_dbenv_delete_db_success(self): - dbenv = db.DBEnv() - dbenv.open(self.homeDir, - db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL, - 0o666) - - d = db.DB(dbenv) - d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - - dbenv.close() # This "close" should close the child db handle also - - del d - try: - import gc - except ImportError: - gc = None - if gc: - # force d.__del__ [DB_dealloc] to be called - gc.collect() - - def test06_close_txn_before_dup_cursor(self) : - dbenv = db.DBEnv() - dbenv.open(self.homeDir,db.DB_INIT_TXN | db.DB_INIT_MPOOL | - db.DB_INIT_LOG | db.DB_CREATE) - d = db.DB(dbenv) - txn = dbenv.txn_begin() - if db.version() < (4,1) : - d.open(self.filename, dbtype = db.DB_HASH, flags = db.DB_CREATE) - else : - d.open(self.filename, dbtype = db.DB_HASH, flags = db.DB_CREATE, - txn=txn) - d.put("XXX", "yyy", txn=txn) - txn.commit() - txn = dbenv.txn_begin() - c1 = d.cursor(txn) - c2 = c1.dup() - self.assertEquals(("XXX", "yyy"), c1.first()) - import warnings - # Not interested in warnings about implicit close. - warnings.simplefilter("ignore") - txn.commit() - warnings.resetwarnings() - self.assertRaises(db.DBCursorClosedError, c2.first) - - if db.version() > (4,3,0) : - def test07_close_db_before_sequence(self): - import os.path - path=os.path.join(self.homeDir,self.filename) - d = db.DB() - d.open(path, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0o666) - dbs=db.DBSequence(d) - d.close() # This "close" should close the child DBSequence also - dbs.close() # If not closed, core dump (in Berkeley DB 4.6.*) - -#---------------------------------------------------------------------- - -def test_suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(DBEnvClosedEarlyCrash)) - return suite - - -if __name__ == '__main__': - unittest.main(defaultTest='test_suite') |