summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-store-chk
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpid-store-chk')
-rwxr-xr-xqpid/tools/src/py/qpid-store-chk332
1 files changed, 332 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-store-chk b/qpid/tools/src/py/qpid-store-chk
new file mode 100755
index 0000000000..f6d70cb3c6
--- /dev/null
+++ b/qpid/tools/src/py/qpid-store-chk
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpidstore import jerr, jrnl, janal
+import optparse, os, sys
+
+
+#== class StoreChk ============================================================
+
+class StoreChk(object):
+ """
+ This class:
+ 1. Reads a journal jinf file, and from its info:
+ 2. Analyzes the journal data files to determine which is the last to be written, then
+ 3. Reads and analyzes all the records in the journal files.
+ The only public method is run() which kicks off the analysis.
+ """
+
+ def __init__(self):
+ """Constructor"""
+ # params
+ self.opts = None
+
+ self._jdir = None
+
+ # recovery analysis objects
+# self._jrnl_info = None
+# self.jrnl_rdr = None
+
+ self._process_args()
+ self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn)
+ # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+ jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+ self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info)
+ self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag,
+ self.opts.vflag)
+
+ def run(self):
+ """Run the store check"""
+ if not self.opts.qflag:
+ print self._jrnl_info
+ print self.jrnl_anal
+ self.jrnl_rdr.run()
+ self._report()
+
+ def _report(self):
+ """Print the results of the store check"""
+ if not self.opts.qflag:
+ print
+ print " === REPORT ===="
+ print
+ print "Records: %8d non-transactional" % \
+ (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt())
+ print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt()
+ print " %8d total" % self.jrnl_rdr.get_msg_cnt()
+ print
+ print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt()
+ print " %8d commits" % self.jrnl_rdr.get_commit_cnt()
+ print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt())
+ print
+ if self.jrnl_rdr.emap().size() > 0:
+ print "Remaining enqueued records (sorted by rid): "
+ rid_list = self.jrnl_rdr.emap().rids()
+ rid_list.sort()
+ for rid in rid_list:
+ l = self.jrnl_rdr.emap().get(rid)
+ locked = ""
+ if l[2]:
+ locked += " (locked)"
+ print " fid=%d %s%s" % (l[0], l[1], locked)
+ print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size()
+ else:
+ print "No remaining enqueued records found (emap empty)."
+ print
+ if self.jrnl_rdr.tmap().size() > 0:
+ txn_rec_cnt = 0
+ print "Incomplete transactions: "
+ for xid in self.jrnl_rdr.tmap().xids():
+ jrnl.Utils.format_xid(xid)
+ recs = self.jrnl_rdr.tmap().get(xid)
+ for l in recs:
+ print " fid=%d %s" % (l[0], l[1])
+ print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid))
+ print
+ txn_rec_cnt += len(recs)
+ print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \
+ (self.jrnl_rdr.tmap().size(), txn_rec_cnt)
+ else:
+ print "No incomplete transactions found (tmap empty)."
+ print
+ print "%d enqueues, %d journal records processed." % \
+ (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt())
+
+
+ def _process_args(self):
+ """Process the command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
+ action="store", dest="bfn", default="JournalData",
+ help="Base filename for old journal files")
+ opt.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ opt.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print all records and transactions (including consumed/closed)")
+ opt.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self.opts, args) = opt.parse_args()
+ if len(args) == 0:
+ opt.error("No journal directory argument")
+ elif len(args) > 1:
+ opt.error("Too many positional arguments: %s" % args)
+ if self.opts.qflag and self.opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self.opts.qflag and self.opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+
+
+#== class CsvStoreChk =========================================================
+
+class CsvStoreChk(StoreChk):
+ """
+ This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction
+ record counts) to expected values from a CSV file. This can be used for additional automated testing, and is
+ currently in use in the long store tests for journal encode testing.
+ """
+
+ # CSV file cols
+ TEST_NUM_COL = 0
+ NUM_MSGS_COL = 5
+ MIN_MSG_SIZE_COL = 7
+ MAX_MSG_SIZE_COL = 8
+ MIN_XID_SIZE_COL = 9
+ MAX_XID_SIZE_COL = 10
+ AUTO_DEQ_COL = 11
+ TRANSIENT_COL = 12
+ EXTERN_COL = 13
+ COMMENT_COL = 20
+
+ def __init__(self):
+ """Constructor"""
+ StoreChk.__init__(self)
+
+ # csv params
+ self.num_msgs = None
+ self.msg_len = None
+ self.auto_deq = None
+ self.xid_len = None
+ self.transient = None
+ self.extern = None
+
+ self._warning = []
+
+ self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk,
+ CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk)
+ self._get_csv_test()
+
+ def _get_csv_test(self):
+ """Get a test from the CSV reader"""
+ if self.opts.csvfn != None and self.opts.tnum != None:
+ tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum)
+ if tparams == None:
+ print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn)
+ sys.exit(1)
+ self.num_msgs = tparams["num_msgs"]
+ if tparams["min_size"] == tparams["max_size"]:
+ self.msg_len = tparams["max_size"]
+ else:
+ self.msg_len = 0
+ self.auto_deq = tparams["auto_deq"]
+ if tparams["xid_min_size"] == tparams["xid_max_size"]:
+ self.xid_len = tparams["xid_max_size"]
+ else:
+ self.xid_len = 0
+ self.transient = tparams["transient"]
+ self.extern = tparams["extern"]
+
+ def _read_csv_file(self, filename, tnum):
+ """Read the CSV test parameter file"""
+ try:
+ csvf = open(filename, "r")
+ except IOError:
+ print "ERROR: Unable to open CSV file \"%s\"" % filename
+ sys.exit(1)
+ for line in csvf:
+ str_list = line.strip().split(",")
+ if len(str_list[0]) > 0 and str_list[0][0] != "\"":
+ try:
+ if (int(str_list[self.TEST_NUM_COL]) == tnum):
+ return { "num_msgs": int(str_list[self.NUM_MSGS_COL]),
+ "min_size": int(str_list[self.MIN_MSG_SIZE_COL]),
+ "max_size": int(str_list[self.MAX_MSG_SIZE_COL]),
+ "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or
+ str_list[self.AUTO_DEQ_COL] == "0"),
+ "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]),
+ "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]),
+ "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or
+ str_list[self.TRANSIENT_COL] == "0"),
+ "extern": not (str_list[self.EXTERN_COL] == "FALSE" or
+ str_list[self.EXTERN_COL] == "0"),
+ "comment": str_list[self.COMMENT_COL] }
+ except Exception:
+ pass
+ return None
+
+ def _process_args(self):
+ """Process command-line arguments"""
+ opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+ opt.add_option("-b", "--base-filename",
+ action="store", dest="bfn", default="JournalData",
+ help="Base filename for old journal files")
+ opt.add_option("-c", "--csv-filename",
+ action="store", dest="csvfn",
+ help="CSV filename containing test parameters")
+ opt.add_option("-q", "--quiet",
+ action="store_true", dest="qflag",
+ help="Quiet (suppress all non-error output)")
+ opt.add_option("-r", "--records",
+ action="store_true", dest="rflag",
+ help="Print all records and transactions (including consumed/closed)")
+ opt.add_option("-t", "--test-num",
+ action="store", type="int", dest="tnum",
+ help="Test number from CSV file - only valid if CSV file named")
+ opt.add_option("-v", "--verbose",
+ action="store_true", dest="vflag",
+ help="Verbose output")
+ (self.opts, args) = opt.parse_args()
+ if len(args) == 0:
+ opt.error("No journal directory argument")
+ elif len(args) > 1:
+ opt.error("Too many positional arguments: %s" % args)
+ if self.opts.qflag and self.opts.rflag:
+ opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+ if self.opts.qflag and self.opts.vflag:
+ opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+ self._jdir = args[0]
+ if not os.path.exists(self._jdir):
+ opt.error("Journal path \"%s\" does not exist" % self._jdir)
+
+ # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error.
+
+ #@staticmethod
+ def _csv_pre_run_chk(csv_store_chk):
+ """Check performed before a test runs"""
+ if csv_store_chk.num_msgs == None:
+ return
+ if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0:
+ raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs)
+ return False
+ _csv_pre_run_chk = staticmethod(_csv_pre_run_chk)
+
+ #@staticmethod
+ def _csv_enq_chk(csv_store_chk, hdr):
+ """Check performed before each enqueue operation"""
+ #if csv_store_chk.num_msgs == None: return
+ #
+ if csv_store_chk.extern != None:
+ if csv_store_chk.extern != hdr.extern:
+ raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern)
+ if hdr.extern and hdr.data != None:
+ raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum)
+ if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \
+ len(hdr.data) != csv_store_chk.msg_len:
+ raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data))
+ if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len:
+ raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid))
+ if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient:
+ raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient)
+ return False
+ _csv_enq_chk = staticmethod(_csv_enq_chk)
+
+ #@staticmethod
+ def _csv_deq_chk(csv_store_chk, hdr):
+ """Check performed before each dequeue operation"""
+ if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq:
+ raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+ (csv_store_chk.opts.tnum, hdr.rid))
+ #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+ # (csv_store_chk.opts.tnum, hdr.rid))
+ return False
+ _csv_deq_chk = staticmethod(_csv_deq_chk)
+
+ #@staticmethod
+ def _csv_txn_chk(csv_store_chk, hdr):
+ """Check performed before each transaction commit/abort"""
+ return False
+ _csv_txn_chk = staticmethod(_csv_txn_chk)
+
+ #@staticmethod
+ def _csv_post_run_chk(csv_store_chk):
+ """Cehck performed after the completion of the test"""
+ # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because
+ # of journal overwriting
+ if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \
+ csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt():
+ raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs,
+ csv_store_chk.jrnl_rdr.get_msg_cnt())
+ return False
+ _csv_post_run_chk = staticmethod(_csv_post_run_chk)
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+ M = CsvStoreChk()
+ try:
+ M.run()
+ except Exception, e:
+ sys.exit(e)