summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpidstore/janal.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpidstore/janal.py')
-rw-r--r--qpid/tools/src/py/qpidstore/janal.py617
1 files changed, 617 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpidstore/janal.py b/qpid/tools/src/py/qpidstore/janal.py
new file mode 100644
index 0000000000..1a892aca60
--- /dev/null
+++ b/qpid/tools/src/py/qpidstore/janal.py
@@ -0,0 +1,617 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import jerr, jrnl
+import os.path, sys
+
+
+#== class EnqMap ==============================================================
+
+class EnqMap(object):
+ """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock"""
+
+ def __init__(self):
+ """Constructor"""
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr, lock = False):
+ """Add a new record into the map"""
+ if hdr.rid in self.__map:
+ raise jerr.DuplicateRidError(hdr.rid)
+ self.__map[hdr.rid] = [fid, hdr, lock]
+
+ def contains(self, rid):
+ """Return True if the map contains the given rid"""
+ return rid in self.__map
+
+ def delete(self, rid):
+ """Delete the rid and its associated data from the map"""
+ if rid in self.__map:
+ if self.get_lock(rid):
+ raise jerr.DeleteLockedRecordError(rid)
+ del self.__map[rid]
+ else:
+ raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid)
+
+ def get(self, rid):
+ """Return a list [fid, hdr, lock] for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid]
+ return None
+
+ def get_fid(self, rid):
+ """Return the fid for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][0]
+ return None
+
+ def get_hdr(self, rid):
+ """Return the header record for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][1]
+ return None
+
+ def get_lock(self, rid):
+ """Return the transaction lock value for the given rid"""
+ if self.contains(rid):
+ return self.__map[rid][2]
+ return None
+
+ def get_rec_list(self):
+ """Return a list of tuples (fid, hdr, lock) for all entries in the map"""
+ return self.__map.values()
+
+ def lock(self, rid):
+ """Set the transaction lock for a given rid to True"""
+ if rid in self.__map:
+ if not self.__map[rid][2]: # locked
+ self.__map[rid][2] = True
+ else:
+ raise jerr.AlreadyLockedError(rid)
+ else:
+ raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid)
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.__map) == 0:
+ return "No enqueued records found."
+ rstr = "%d enqueued records found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ rid_list = self.__map.keys()
+ rid_list.sort()
+ for rid in rid_list:
+ if self.__map[rid][2]:
+ lock_str = " [LOCKED]"
+ else:
+ lock_str = ""
+ rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str)
+ else:
+ rstr += "."
+ return rstr
+
+ def rids(self):
+ """Return a list of rids in the map"""
+ return self.__map.keys()
+
+ def size(self):
+ """Return the number of entries in the map"""
+ return len(self.__map)
+
+ def unlock(self, rid):
+ """Set the transaction lock for a given rid to False"""
+ if rid in self.__map:
+ if self.__map[rid][2]:
+ self.__map[rid][2] = False
+ else:
+ raise jerr.NotLockedError(rid)
+ else:
+ raise jerr.NonExistentRecordError("unlock", rid)
+
+
+#== class TxnMap ==============================================================
+
+class TxnMap(object):
+ """Transaction map, which maps xids to a list of outstanding actions"""
+
+ def __init__(self, emap):
+ """Constructor, requires an existing EnqMap instance"""
+ self.__emap = emap
+ self.__map = {}
+
+ def __str__(self):
+ """Print the contents of the map"""
+ return self.report(True, True)
+
+ def add(self, fid, hdr):
+ """Add a new transactional record into the map"""
+ if isinstance(hdr, jrnl.DeqRec):
+ try:
+ self.__emap.lock(hdr.deq_rid)
+ except jerr.JWarning:
+ # Not in emap, look for rid in tmap
+ l = self.find_rid(hdr.deq_rid, hdr.xid)
+ if l != None:
+ if l[2]:
+ raise jerr.AlreadyLockedError(hdr.deq_rid)
+ l[2] = True
+ if hdr.xid in self.__map:
+ self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list
+ else:
+ self.__map[hdr.xid] = [[fid, hdr, False]] # create new list
+
+ def contains(self, xid):
+ """Return True if the xid exists in the map; False otherwise"""
+ return xid in self.__map
+
+ def delete(self, hdr):
+ """Remove a transaction record from the map using either a commit or abort header"""
+ if hdr.magic[-1] == "c":
+ return self._commit(hdr.xid)
+ if hdr.magic[-1] == "a":
+ self._abort(hdr.xid)
+ else:
+ raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid)
+
+ def find_rid(self, rid, xid_hint = None):
+ """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first"""
+ if xid_hint != None and self.contains(xid_hint):
+ for l in self.__map[xid_hint]:
+ if l[1].rid == rid:
+ return l
+ for xid in self.__map.iterkeys():
+ if xid_hint == None or xid != xid_hint:
+ for l in self.__map[xid]:
+ if l[1].rid == rid:
+ return l
+
+ def get(self, xid):
+ """Return a list of operations for the given xid"""
+ if self.contains(xid):
+ return self.__map[xid]
+
+ def report(self, show_stats, show_records):
+ """Return a string containing a text report for all records in the map"""
+ if len(self.__map) == 0:
+ return "No outstanding transactions found."
+ rstr = "%d outstanding transactions found" % len(self.__map)
+ if show_records:
+ rstr += ":"
+ for xid, tup in self.__map.iteritems():
+ rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
+ for i in tup:
+ rstr += "\n %s" % str(i[1])
+ else:
+ rstr += "."
+ return rstr
+
+ def size(self):
+ """Return the number of xids in the map"""
+ return len(self.__map)
+
+ def xids(self):
+ """Return a list of xids in the map"""
+ return self.__map.keys()
+
+ def _abort(self, xid):
+ """Perform an abort operation for the given xid record"""
+ for _, hdr, _ in self.__map[xid]:
+ if isinstance(hdr, jrnl.DeqRec):
+ try:
+ self.__emap.unlock(hdr.deq_rid)
+ except jerr.NonExistentRecordError, err: # Not in emap, look in current transaction op list (TPL)
+ found_rid = False
+ for _, hdr1, _ in self.__map[xid]:
+ if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid:
+ found_rid = True
+ break
+ if not found_rid: # Not found in current transaction op list, re-throw error
+ raise err
+ del self.__map[xid]
+
+ def _commit(self, xid):
+ """Perform a commit operation for the given xid record"""
+ mismatch_list = []
+ for fid, hdr, lock in self.__map[xid]:
+ if isinstance(hdr, jrnl.EnqRec):
+ self.__emap.add(fid, hdr, lock) # Transfer enq to emap
+ else:
+ if self.__emap.contains(hdr.deq_rid):
+ self.__emap.unlock(hdr.deq_rid)
+ self.__emap.delete(hdr.deq_rid)
+ else:
+ mismatch_list.append("0x%x" % hdr.deq_rid)
+ del self.__map[xid]
+ return mismatch_list
+
+#== class JrnlAnalyzer ========================================================
+
+class JrnlAnalyzer(object):
+ """
+ This class analyzes a set of journal files and determines which is the last to be written
+ (the newest file), and hence which should be the first to be read for recovery (the oldest
+ file).
+
+ The analysis is performed on construction; the contents of the JrnlInfo object passed provide
+ the recovery details.
+ """
+
+ def __init__(self, jinf):
+ """Constructor"""
+ self.__oldest = None
+ self.__jinf = jinf
+ self.__flist = self._analyze()
+
+ def __str__(self):
+ """String representation of this JrnlAnalyzer instance, will print out results of analysis."""
+ ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir()
+ if self.is_empty():
+ ostr += " <All journal files are empty>\n"
+ else:
+ for tup in self.__flist:
+ tmp = " "
+ if tup[0] == self.__oldest[0]:
+ tmp = "*"
+ ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2],
+ tup[3], tup[4], tup[5])
+ for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()):
+ ostr += " %s.%04x.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i)
+ return ostr
+
+ # Analysis
+
+ def get_oldest_file(self):
+ """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal"""
+ return self.__oldest
+
+ def get_oldest_file_index(self):
+ """Return the ordinal number of the oldest data file found in the journal"""
+ if self.is_empty():
+ return None
+ return self.__oldest[0]
+
+ def is_empty(self):
+ """Return true if the analysis found that the journal file has never been written to"""
+ return len(self.__flist) == 0
+
+ def _analyze(self):
+ """Perform the journal file analysis by reading and comparing the file headers of each journal data file"""
+ owi_found = False
+ flist = []
+ for i in range(0, self.__jinf.get_num_jrnl_files()):
+ jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i))
+ fhandle = open(jfn)
+ fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr)
+ if fhdr.empty():
+ break
+ this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+ flist.append(this_tup)
+ if i == 0:
+ init_owi = fhdr.owi()
+ self.__oldest = this_tup
+ elif fhdr.owi() != init_owi and not owi_found:
+ self.__oldest = this_tup
+ owi_found = True
+ return flist
+
+
+#== class JrnlReader ====================================================
+
+class JrnlReader(object):
+ """
+ This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction
+ object list (txn_obj_list) which are populated by reading the journals from the oldest
+ to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer
+ objects supplied on construction provide the information used for the recovery.
+
+ The analysis is performed on construction.
+ """
+
+ def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False):
+ """Constructor, which reads all """
+ self._jinfo = jinfo
+ self._jra = jra
+ self._qflag = qflag
+ self._rflag = rflag
+ self._vflag = vflag
+
+ # test callback functions for CSV tests
+ self._csv_store_chk = None
+ self._csv_start_cb = None
+ self._csv_enq_cb = None
+ self._csv_deq_cb = None
+ self._csv_txn_cb = None
+ self._csv_end_cb = None
+
+ self._emap = EnqMap()
+ self._tmap = TxnMap(self._emap)
+ self._txn_obj_list = {}
+
+ self._file = None
+ self._file_hdr = None
+ self._file_num = None
+ self._first_rec_flag = None
+ self._fro = None
+ self._last_file_flag = None
+ self._start_file_num = None
+ self._file_hdr_owi = None
+ self._warning = []
+
+ self._abort_cnt = 0
+ self._commit_cnt = 0
+ self._msg_cnt = 0
+ self._rec_cnt = 0
+ self._txn_msg_cnt = 0
+
+ def __str__(self):
+ """Print out all the undequeued records"""
+ return self.report(True, self._rflag)
+
+ def emap(self):
+ """Get the enqueue map"""
+ return self._emap
+
+ def get_abort_cnt(self):
+ """Get the cumulative number of transactional aborts found"""
+ return self._abort_cnt
+
+ def get_commit_cnt(self):
+ """Get the cumulative number of transactional commits found"""
+ return self._commit_cnt
+
+ def get_msg_cnt(self):
+ """Get the cumulative number of messages found"""
+ return self._msg_cnt
+
+ def get_rec_cnt(self):
+ """Get the cumulative number of journal records (including fillers) found"""
+ return self._rec_cnt
+
+ def is_last_file(self):
+ """Return True if the last file is being read"""
+ return self._last_file_flag
+
+ def report(self, show_stats = True, show_records = False):
+ """Return a string containing a report on the file analysis"""
+ rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records)
+ #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold
+ return rstr
+
+ def run(self):
+ """Perform the read of the journal"""
+ if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk):
+ return
+ if self._jra.is_empty():
+ return
+ stop = self._advance_jrnl_file(*self._jra.get_oldest_file())
+ while not stop and not self._get_next_record():
+ pass
+ if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk):
+ return
+ if not self._qflag:
+ print
+
+ def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None,
+ csv_end_cb = None):
+ """Set callbacks for checks to be made at various points while reading the journal"""
+ self._csv_store_chk = csv_store_chk
+ self._csv_start_cb = csv_start_cb
+ self._csv_enq_cb = csv_enq_cb
+ self._csv_deq_cb = csv_deq_cb
+ self._csv_txn_cb = csv_txn_cb
+ self._csv_end_cb = csv_end_cb
+
+ def tmap(self):
+ """Return the transaction map"""
+ return self._tmap
+
+ def get_txn_msg_cnt(self):
+ """Get the cumulative transactional message count"""
+ return self._txn_msg_cnt
+
+ def txn_obj_list(self):
+ """Get a cumulative list of transaction objects (commits and aborts)"""
+ return self._txn_obj_list
+
+ def _advance_jrnl_file(self, *oldest_file_info):
+ """Rotate to using the next journal file. Return False if the operation was successful, True if there are no
+ more files to read."""
+ fro_seek_flag = False
+ if len(oldest_file_info) > 0:
+ self._start_file_num = self._file_num = oldest_file_info[0]
+ self._fro = oldest_file_info[4]
+ fro_seek_flag = True # jump to fro to start reading
+ if not self._qflag and not self._rflag:
+ if self._vflag:
+ print "Recovering journals..."
+ else:
+ print "Recovering journals",
+ if self._file != None and self._is_file_full():
+ self._file.close()
+ self._file_num = self._incr_file_num()
+ if self._file_num == self._start_file_num:
+ return True
+ if self._start_file_num == 0:
+ self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1
+ else:
+ self._last_file_flag = self._file_num == self._start_file_num - 1
+ if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files():
+ raise jerr.BadFileNumberError(self._file_num)
+ jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" %
+ (self._jinfo.get_jrnl_base_name(), self._file_num))
+ self._file = open(jfn)
+ self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ if fro_seek_flag and self._file.tell() != self._fro:
+ self._file.seek(self._fro)
+ self._first_rec_flag = True
+ if not self._qflag:
+ if self._rflag:
+ print jfn, ": ", self._file_hdr
+ elif self._vflag:
+ print "* Reading %s" % jfn
+ else:
+ print ".",
+ sys.stdout.flush()
+ return False
+
+ def _check_owi(self, hdr):
+ """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can
+ indicate whether the last record in a file has been read and now older records which have not yet been
+ overwritten are now being read."""
+ return self._file_hdr_owi == hdr.owi()
+
+ def _is_file_full(self):
+ """Return True if the current file is full (no more write space); false otherwise"""
+ return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes()
+
+ def _get_next_record(self):
+ """Get the next record in the file for analysis"""
+ if self._is_file_full():
+ if self._advance_jrnl_file():
+ return True
+ try:
+ hdr = jrnl.Utils.load(self._file, jrnl.Hdr)
+ except:
+ return True
+ if hdr.empty():
+ return True
+ if hdr.check():
+ return True
+ self._rec_cnt += 1
+ self._file_hdr_owi = self._file_hdr.owi()
+ if self._first_rec_flag:
+ if self._file_hdr.fro != hdr.foffs:
+ raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs)
+ else:
+ if self._rflag:
+ print " * fro ok: 0x%x" % self._file_hdr.fro
+ self._first_rec_flag = False
+ stop = False
+ if isinstance(hdr, jrnl.EnqRec):
+ stop = self._handle_enq_rec(hdr)
+ elif isinstance(hdr, jrnl.DeqRec):
+ stop = self._handle_deq_rec(hdr)
+ elif isinstance(hdr, jrnl.TxnRec):
+ stop = self._handle_txn_rec(hdr)
+ wstr = ""
+ for warn in self._warning:
+ wstr += " (%s)" % warn
+ if self._rflag:
+ print " > %s %s" % (hdr, wstr)
+ self._warning = []
+ return stop
+
+ def _handle_deq_rec(self, hdr):
+ """Process a dequeue ("RHMd") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr):
+ return True
+
+ try:
+ if hdr.xid == None:
+ self._emap.delete(hdr.deq_rid)
+ else:
+ self._tmap.add(self._file_hdr.fid, hdr)
+ except jerr.JWarning, warn:
+ self._warning.append(str(warn))
+ return False
+
+ def _handle_enq_rec(self, hdr):
+ """Process a dequeue ("RHMe") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check extern flag
+ if hdr.extern and hdr.data != None:
+ raise jerr.ExternFlagDataError(hdr)
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.xid == None:
+ self._emap.add(self._file_hdr.fid, hdr)
+ else:
+ self._txn_msg_cnt += 1
+ self._tmap.add(self._file_hdr.fid, hdr)
+ self._msg_cnt += 1
+ return False
+
+ def _handle_txn_rec(self, hdr):
+ """Process a transaction ("RHMa or RHMc") record"""
+ if self._load_rec(hdr):
+ return True
+
+ # Check OWI flag
+ if not self._check_owi(hdr):
+ self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.")
+ return True
+ # Test hook
+ if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
+ return True
+
+ if hdr.magic[-1] == "a":
+ self._abort_cnt += 1
+ else:
+ self._commit_cnt += 1
+
+ if self._tmap.contains(hdr.xid):
+ mismatched_rids = self._tmap.delete(hdr)
+ if mismatched_rids != None and len(mismatched_rids) > 0:
+ self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" %
+ mismatched_rids)
+ else:
+ self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid))
+ if hdr.magic[-1] == "c": # commits only
+ self._txn_obj_list[hdr.xid] = hdr
+ return False
+
+ def _incr_file_num(self):
+ """Increment the number of files read with wraparound (ie after file n-1, go to 0)"""
+ self._file_num += 1
+ if self._file_num >= self._jinfo.get_num_jrnl_files():
+ self._file_num = 0
+ return self._file_num
+
+ def _load_rec(self, hdr):
+ """Load a single record for the given header. There may be arbitrarily large xids and data components."""
+ while not hdr.complete():
+ if self._advance_jrnl_file():
+ return True
+ hdr.load(self._file)
+ return False
+
+# =============================================================================
+
+if __name__ == "__main__":
+ print "This is a library, and cannot be executed."