diff options
Diffstat (limited to 'qpid/tools/src/py/qpid-store-chk')
-rwxr-xr-x | qpid/tools/src/py/qpid-store-chk | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-store-chk b/qpid/tools/src/py/qpid-store-chk new file mode 100755 index 0000000000..f6d70cb3c6 --- /dev/null +++ b/qpid/tools/src/py/qpid-store-chk @@ -0,0 +1,332 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpidstore import jerr, jrnl, janal +import optparse, os, sys + + +#== class StoreChk ============================================================ + +class StoreChk(object): + """ + This class: + 1. Reads a journal jinf file, and from its info: + 2. Analyzes the journal data files to determine which is the last to be written, then + 3. Reads and analyzes all the records in the journal files. + The only public method is run() which kicks off the analysis. + """ + + def __init__(self): + """Constructor""" + # params + self.opts = None + + self._jdir = None + + # recovery analysis objects +# self._jrnl_info = None +# self.jrnl_rdr = None + + self._process_args() + self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn) + # FIXME: This is a hack... find an elegant way of getting the file size to jrec! + jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes() + self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info) + self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag, + self.opts.vflag) + + def run(self): + """Run the store check""" + if not self.opts.qflag: + print self._jrnl_info + print self.jrnl_anal + self.jrnl_rdr.run() + self._report() + + def _report(self): + """Print the results of the store check""" + if not self.opts.qflag: + print + print " === REPORT ====" + print + print "Records: %8d non-transactional" % \ + (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt()) + print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt() + print " %8d total" % self.jrnl_rdr.get_msg_cnt() + print + print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt() + print " %8d commits" % self.jrnl_rdr.get_commit_cnt() + print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt()) + print + if self.jrnl_rdr.emap().size() > 0: + print "Remaining enqueued records (sorted by rid): " + rid_list = self.jrnl_rdr.emap().rids() + rid_list.sort() + for rid in rid_list: + l = self.jrnl_rdr.emap().get(rid) + locked = "" + if l[2]: + locked += " (locked)" + print " fid=%d %s%s" % (l[0], l[1], locked) + print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size() + else: + print "No remaining enqueued records found (emap empty)." + print + if self.jrnl_rdr.tmap().size() > 0: + txn_rec_cnt = 0 + print "Incomplete transactions: " + for xid in self.jrnl_rdr.tmap().xids(): + jrnl.Utils.format_xid(xid) + recs = self.jrnl_rdr.tmap().get(xid) + for l in recs: + print " fid=%d %s" % (l[0], l[1]) + print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid)) + print + txn_rec_cnt += len(recs) + print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \ + (self.jrnl_rdr.tmap().size(), txn_rec_cnt) + else: + print "No incomplete transactions found (tmap empty)." + print + print "%d enqueues, %d journal records processed." % \ + (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt()) + + + def _process_args(self): + """Process the command-line arguments""" + opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") + opt.add_option("-b", "--base-filename", + action="store", dest="bfn", default="JournalData", + help="Base filename for old journal files") + opt.add_option("-q", "--quiet", + action="store_true", dest="qflag", + help="Quiet (suppress all non-error output)") + opt.add_option("-r", "--records", + action="store_true", dest="rflag", + help="Print all records and transactions (including consumed/closed)") + opt.add_option("-v", "--verbose", + action="store_true", dest="vflag", + help="Verbose output") + (self.opts, args) = opt.parse_args() + if len(args) == 0: + opt.error("No journal directory argument") + elif len(args) > 1: + opt.error("Too many positional arguments: %s" % args) + if self.opts.qflag and self.opts.rflag: + opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") + if self.opts.qflag and self.opts.vflag: + opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") + self._jdir = args[0] + if not os.path.exists(self._jdir): + opt.error("Journal path \"%s\" does not exist" % self._jdir) + + +#== class CsvStoreChk ========================================================= + +class CsvStoreChk(StoreChk): + """ + This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction + record counts) to expected values from a CSV file. This can be used for additional automated testing, and is + currently in use in the long store tests for journal encode testing. + """ + + # CSV file cols + TEST_NUM_COL = 0 + NUM_MSGS_COL = 5 + MIN_MSG_SIZE_COL = 7 + MAX_MSG_SIZE_COL = 8 + MIN_XID_SIZE_COL = 9 + MAX_XID_SIZE_COL = 10 + AUTO_DEQ_COL = 11 + TRANSIENT_COL = 12 + EXTERN_COL = 13 + COMMENT_COL = 20 + + def __init__(self): + """Constructor""" + StoreChk.__init__(self) + + # csv params + self.num_msgs = None + self.msg_len = None + self.auto_deq = None + self.xid_len = None + self.transient = None + self.extern = None + + self._warning = [] + + self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk, + CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk) + self._get_csv_test() + + def _get_csv_test(self): + """Get a test from the CSV reader""" + if self.opts.csvfn != None and self.opts.tnum != None: + tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum) + if tparams == None: + print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn) + sys.exit(1) + self.num_msgs = tparams["num_msgs"] + if tparams["min_size"] == tparams["max_size"]: + self.msg_len = tparams["max_size"] + else: + self.msg_len = 0 + self.auto_deq = tparams["auto_deq"] + if tparams["xid_min_size"] == tparams["xid_max_size"]: + self.xid_len = tparams["xid_max_size"] + else: + self.xid_len = 0 + self.transient = tparams["transient"] + self.extern = tparams["extern"] + + def _read_csv_file(self, filename, tnum): + """Read the CSV test parameter file""" + try: + csvf = open(filename, "r") + except IOError: + print "ERROR: Unable to open CSV file \"%s\"" % filename + sys.exit(1) + for line in csvf: + str_list = line.strip().split(",") + if len(str_list[0]) > 0 and str_list[0][0] != "\"": + try: + if (int(str_list[self.TEST_NUM_COL]) == tnum): + return { "num_msgs": int(str_list[self.NUM_MSGS_COL]), + "min_size": int(str_list[self.MIN_MSG_SIZE_COL]), + "max_size": int(str_list[self.MAX_MSG_SIZE_COL]), + "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or + str_list[self.AUTO_DEQ_COL] == "0"), + "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]), + "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]), + "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or + str_list[self.TRANSIENT_COL] == "0"), + "extern": not (str_list[self.EXTERN_COL] == "FALSE" or + str_list[self.EXTERN_COL] == "0"), + "comment": str_list[self.COMMENT_COL] } + except Exception: + pass + return None + + def _process_args(self): + """Process command-line arguments""" + opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") + opt.add_option("-b", "--base-filename", + action="store", dest="bfn", default="JournalData", + help="Base filename for old journal files") + opt.add_option("-c", "--csv-filename", + action="store", dest="csvfn", + help="CSV filename containing test parameters") + opt.add_option("-q", "--quiet", + action="store_true", dest="qflag", + help="Quiet (suppress all non-error output)") + opt.add_option("-r", "--records", + action="store_true", dest="rflag", + help="Print all records and transactions (including consumed/closed)") + opt.add_option("-t", "--test-num", + action="store", type="int", dest="tnum", + help="Test number from CSV file - only valid if CSV file named") + opt.add_option("-v", "--verbose", + action="store_true", dest="vflag", + help="Verbose output") + (self.opts, args) = opt.parse_args() + if len(args) == 0: + opt.error("No journal directory argument") + elif len(args) > 1: + opt.error("Too many positional arguments: %s" % args) + if self.opts.qflag and self.opts.rflag: + opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") + if self.opts.qflag and self.opts.vflag: + opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") + self._jdir = args[0] + if not os.path.exists(self._jdir): + opt.error("Journal path \"%s\" does not exist" % self._jdir) + + # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error. + + #@staticmethod + def _csv_pre_run_chk(csv_store_chk): + """Check performed before a test runs""" + if csv_store_chk.num_msgs == None: + return + if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0: + raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs) + return False + _csv_pre_run_chk = staticmethod(_csv_pre_run_chk) + + #@staticmethod + def _csv_enq_chk(csv_store_chk, hdr): + """Check performed before each enqueue operation""" + #if csv_store_chk.num_msgs == None: return + # + if csv_store_chk.extern != None: + if csv_store_chk.extern != hdr.extern: + raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern) + if hdr.extern and hdr.data != None: + raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum) + if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \ + len(hdr.data) != csv_store_chk.msg_len: + raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data)) + if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len: + raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid)) + if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient: + raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient) + return False + _csv_enq_chk = staticmethod(_csv_enq_chk) + + #@staticmethod + def _csv_deq_chk(csv_store_chk, hdr): + """Check performed before each dequeue operation""" + if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq: + raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % + (csv_store_chk.opts.tnum, hdr.rid)) + #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % + # (csv_store_chk.opts.tnum, hdr.rid)) + return False + _csv_deq_chk = staticmethod(_csv_deq_chk) + + #@staticmethod + def _csv_txn_chk(csv_store_chk, hdr): + """Check performed before each transaction commit/abort""" + return False + _csv_txn_chk = staticmethod(_csv_txn_chk) + + #@staticmethod + def _csv_post_run_chk(csv_store_chk): + """Cehck performed after the completion of the test""" + # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because + # of journal overwriting + if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \ + csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt(): + raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs, + csv_store_chk.jrnl_rdr.get_msg_cnt()) + return False + _csv_post_run_chk = staticmethod(_csv_post_run_chk) + +#============================================================================== +# main program +#============================================================================== + +if __name__ == "__main__": + M = CsvStoreChk() + try: + M.run() + except Exception, e: + sys.exit(e) |