diff options
Diffstat (limited to 'src/third_party/wiredtiger/test/suite/wtthread.py')
-rw-r--r-- | src/third_party/wiredtiger/test/suite/wtthread.py | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/suite/wtthread.py b/src/third_party/wiredtiger/test/suite/wtthread.py new file mode 100644 index 00000000000..046a915394d --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/wtthread.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2016 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import Queue +import os, shutil, sys, threading, time, wiredtiger, wttest +from helper import compare_tables + +class checkpoint_thread(threading.Thread): + def __init__(self, conn, done): + self.conn = conn + self.done = done + threading.Thread.__init__(self) + + def run(self): + sess = self.conn.open_session() + while not self.done.isSet(): + # Sleep for 10 milliseconds. + time.sleep(0.001) + sess.checkpoint() + sess.close() + +class backup_thread(threading.Thread): + def __init__(self, conn, backup_dir, done): + self.backup_dir = backup_dir + self.conn = conn + self.done = done + threading.Thread.__init__(self) + + def run(self): + sess = self.conn.open_session() + while not self.done.isSet(): + # Sleep for 2 seconds. + time.sleep(2) + sess.checkpoint() + shutil.rmtree(self.backup_dir, ignore_errors=True) + os.mkdir(self.backup_dir) + cursor = sess.open_cursor('backup:', None, None) + files = list() + while True: + ret = cursor.next() + if ret != 0: + break + files.append(cursor.get_key()) + shutil.copy(cursor.get_key(), self.backup_dir) + + cursor.close() + + bkp_conn = None + try: + bkp_conn = wiredtiger.wiredtiger_open(self.backup_dir) + bkp_session = bkp_conn.open_session() + # Verify that the backup was OK. + uris = list() + for next_file in files: + if next_file.startswith("WiredTiger"): + continue + uri = "file:" + next_file + uris.append(uri) + + # TODO: We want a self.assertTrue here - be need to be a + # wttest to do that.. + if not compare_tables( + self, sess, uris, "checkpoint=WiredTigerCheckpoint"): + print "Error: checkpoint tables differ." + else: + wttest.WiredTigerTestCase.printVerbose( + 3, "Checkpoint tables match") + + if not compare_tables(self, bkp_session, uris): + print "Error: backup tables differ." + else: + wttest.WiredTigerTestCase.printVerbose( + 3, "Backup tables match") + finally: + if bkp_conn != None: + bkp_conn.close() + + sess.close() + +# A worker thread that pulls jobs off a queue. The jobs can be different types. +# Currently supported types are: +# 'i' for insert. +# 'b' for bounce (close and open) a session handle +# 'd' for drop a table +# 't' for create a table and insert a single item into it +class op_thread(threading.Thread): + def __init__(self, conn, uris, key_fmt, queue, done): + self.conn = conn + self.uris = uris + self.key_fmt = key_fmt + self.queue = queue + self.done = done + threading.Thread.__init__(self) + + def run(self): + sess = self.conn.open_session() + if (len(self.uris) == 1): + c = sess.open_cursor(self.uris[0], None, None) + else: + cursors = list() + for next_uri in self.uris: + cursors.append(sess.open_cursor(next_uri, None, None)) + while not self.done.isSet(): + try: + op, key, value = self.queue.get_nowait() + if op == 'gi': # Group insert a number of tables. + sess.begin_transaction() + for next_cur in cursors: + next_cur[key] = value + sess.commit_transaction() + if op == 'gu': # Group update a number of tables. + sess.begin_transaction() + for next_cur in cursors: + next_cur.set_key(key) + next_cur.set_value(value) + next_cur.update() + next_cur.reset() + sess.commit_transaction() + if op == 'i': # Insert an item + c[key] = value + elif op == 'b': # Bounce the session handle. + c.close() + sess.close() + sess = self.conn.open_session() + c = sess.open_cursor(self.uris[0], None, None) + elif op == 'd': # Drop a table. The uri identifier is in key + for i in range(10): + try: + sess.drop(self.uris[0] + str(key)) + break + except wiredtiger.WiredTigerError as e: + continue + elif op == 't': # Create a table, add an entry + sess.create(self.uris[0] + str(key), + "key_format=" + self.key_fmt + ",value_format=S") + try: + c2 = sess.open_cursor(self.uris[0] + str(key), None, None) + c2[key] = value + c2.close() + except wiredtiger.WiredTigerError as e: + # These operations can fail, if the drop in another + # thread happened + pass + self.queue.task_done() + except Queue.Empty: + # Wait on the queue until done is flagged + time.sleep(0.01) + if (len(self.uris) == 1): + c.close() + else: + for next_cursor in cursors: + next_cursor.close() + sess.close() |