diff options
Diffstat (limited to 'qpid/tools/src/py/qpidstore/janal.py')
-rw-r--r-- | qpid/tools/src/py/qpidstore/janal.py | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpidstore/janal.py b/qpid/tools/src/py/qpidstore/janal.py new file mode 100644 index 0000000000..1a892aca60 --- /dev/null +++ b/qpid/tools/src/py/qpidstore/janal.py @@ -0,0 +1,617 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import jerr, jrnl +import os.path, sys + + +#== class EnqMap ============================================================== + +class EnqMap(object): + """Class for maintaining a map of enqueued records, indexing the rid against hdr, fid and transaction lock""" + + def __init__(self): + """Constructor""" + self.__map = {} + + def __str__(self): + """Print the contents of the map""" + return self.report(True, True) + + def add(self, fid, hdr, lock = False): + """Add a new record into the map""" + if hdr.rid in self.__map: + raise jerr.DuplicateRidError(hdr.rid) + self.__map[hdr.rid] = [fid, hdr, lock] + + def contains(self, rid): + """Return True if the map contains the given rid""" + return rid in self.__map + + def delete(self, rid): + """Delete the rid and its associated data from the map""" + if rid in self.__map: + if self.get_lock(rid): + raise jerr.DeleteLockedRecordError(rid) + del self.__map[rid] + else: + raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid) + + def get(self, rid): + """Return a list [fid, hdr, lock] for the given rid""" + if self.contains(rid): + return self.__map[rid] + return None + + def get_fid(self, rid): + """Return the fid for the given rid""" + if self.contains(rid): + return self.__map[rid][0] + return None + + def get_hdr(self, rid): + """Return the header record for the given rid""" + if self.contains(rid): + return self.__map[rid][1] + return None + + def get_lock(self, rid): + """Return the transaction lock value for the given rid""" + if self.contains(rid): + return self.__map[rid][2] + return None + + def get_rec_list(self): + """Return a list of tuples (fid, hdr, lock) for all entries in the map""" + return self.__map.values() + + def lock(self, rid): + """Set the transaction lock for a given rid to True""" + if rid in self.__map: + if not self.__map[rid][2]: # locked + self.__map[rid][2] = True + else: + raise jerr.AlreadyLockedError(rid) + else: + raise jerr.JWarning("ERROR: Locking non-existent rid in EnqMap: rid=0x%x" % rid) + + def report(self, show_stats, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.__map) == 0: + return "No enqueued records found." + rstr = "%d enqueued records found" % len(self.__map) + if show_records: + rstr += ":" + rid_list = self.__map.keys() + rid_list.sort() + for rid in rid_list: + if self.__map[rid][2]: + lock_str = " [LOCKED]" + else: + lock_str = "" + rstr += "\n lfid=%d %s %s" % (rec[0], rec[1], lock_str) + else: + rstr += "." + return rstr + + def rids(self): + """Return a list of rids in the map""" + return self.__map.keys() + + def size(self): + """Return the number of entries in the map""" + return len(self.__map) + + def unlock(self, rid): + """Set the transaction lock for a given rid to False""" + if rid in self.__map: + if self.__map[rid][2]: + self.__map[rid][2] = False + else: + raise jerr.NotLockedError(rid) + else: + raise jerr.NonExistentRecordError("unlock", rid) + + +#== class TxnMap ============================================================== + +class TxnMap(object): + """Transaction map, which maps xids to a list of outstanding actions""" + + def __init__(self, emap): + """Constructor, requires an existing EnqMap instance""" + self.__emap = emap + self.__map = {} + + def __str__(self): + """Print the contents of the map""" + return self.report(True, True) + + def add(self, fid, hdr): + """Add a new transactional record into the map""" + if isinstance(hdr, jrnl.DeqRec): + try: + self.__emap.lock(hdr.deq_rid) + except jerr.JWarning: + # Not in emap, look for rid in tmap + l = self.find_rid(hdr.deq_rid, hdr.xid) + if l != None: + if l[2]: + raise jerr.AlreadyLockedError(hdr.deq_rid) + l[2] = True + if hdr.xid in self.__map: + self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list + else: + self.__map[hdr.xid] = [[fid, hdr, False]] # create new list + + def contains(self, xid): + """Return True if the xid exists in the map; False otherwise""" + return xid in self.__map + + def delete(self, hdr): + """Remove a transaction record from the map using either a commit or abort header""" + if hdr.magic[-1] == "c": + return self._commit(hdr.xid) + if hdr.magic[-1] == "a": + self._abort(hdr.xid) + else: + raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid) + + def find_rid(self, rid, xid_hint = None): + """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first""" + if xid_hint != None and self.contains(xid_hint): + for l in self.__map[xid_hint]: + if l[1].rid == rid: + return l + for xid in self.__map.iterkeys(): + if xid_hint == None or xid != xid_hint: + for l in self.__map[xid]: + if l[1].rid == rid: + return l + + def get(self, xid): + """Return a list of operations for the given xid""" + if self.contains(xid): + return self.__map[xid] + + def report(self, show_stats, show_records): + """Return a string containing a text report for all records in the map""" + if len(self.__map) == 0: + return "No outstanding transactions found." + rstr = "%d outstanding transactions found" % len(self.__map) + if show_records: + rstr += ":" + for xid, tup in self.__map.iteritems(): + rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid) + for i in tup: + rstr += "\n %s" % str(i[1]) + else: + rstr += "." + return rstr + + def size(self): + """Return the number of xids in the map""" + return len(self.__map) + + def xids(self): + """Return a list of xids in the map""" + return self.__map.keys() + + def _abort(self, xid): + """Perform an abort operation for the given xid record""" + for _, hdr, _ in self.__map[xid]: + if isinstance(hdr, jrnl.DeqRec): + try: + self.__emap.unlock(hdr.deq_rid) + except jerr.NonExistentRecordError, err: # Not in emap, look in current transaction op list (TPL) + found_rid = False + for _, hdr1, _ in self.__map[xid]: + if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid: + found_rid = True + break + if not found_rid: # Not found in current transaction op list, re-throw error + raise err + del self.__map[xid] + + def _commit(self, xid): + """Perform a commit operation for the given xid record""" + mismatch_list = [] + for fid, hdr, lock in self.__map[xid]: + if isinstance(hdr, jrnl.EnqRec): + self.__emap.add(fid, hdr, lock) # Transfer enq to emap + else: + if self.__emap.contains(hdr.deq_rid): + self.__emap.unlock(hdr.deq_rid) + self.__emap.delete(hdr.deq_rid) + else: + mismatch_list.append("0x%x" % hdr.deq_rid) + del self.__map[xid] + return mismatch_list + +#== class JrnlAnalyzer ======================================================== + +class JrnlAnalyzer(object): + """ + This class analyzes a set of journal files and determines which is the last to be written + (the newest file), and hence which should be the first to be read for recovery (the oldest + file). + + The analysis is performed on construction; the contents of the JrnlInfo object passed provide + the recovery details. + """ + + def __init__(self, jinf): + """Constructor""" + self.__oldest = None + self.__jinf = jinf + self.__flist = self._analyze() + + def __str__(self): + """String representation of this JrnlAnalyzer instance, will print out results of analysis.""" + ostr = "Journal files analyzed in directory %s (* = earliest full):\n" % self.__jinf.get_current_dir() + if self.is_empty(): + ostr += " <All journal files are empty>\n" + else: + for tup in self.__flist: + tmp = " " + if tup[0] == self.__oldest[0]: + tmp = "*" + ostr += " %s %s: owi=%-5s rid=0x%x, fro=0x%x ts=%s\n" % (tmp, os.path.basename(tup[1]), tup[2], + tup[3], tup[4], tup[5]) + for i in range(self.__flist[-1][0] + 1, self.__jinf.get_num_jrnl_files()): + ostr += " %s.%04x.jdat: <empty>\n" % (self.__jinf.get_jrnl_base_name(), i) + return ostr + + # Analysis + + def get_oldest_file(self): + """Return a tuple (ordnum, jfn, owi, rid, fro, timestamp) for the oldest data file found in the journal""" + return self.__oldest + + def get_oldest_file_index(self): + """Return the ordinal number of the oldest data file found in the journal""" + if self.is_empty(): + return None + return self.__oldest[0] + + def is_empty(self): + """Return true if the analysis found that the journal file has never been written to""" + return len(self.__flist) == 0 + + def _analyze(self): + """Perform the journal file analysis by reading and comparing the file headers of each journal data file""" + owi_found = False + flist = [] + for i in range(0, self.__jinf.get_num_jrnl_files()): + jfn = os.path.join(self.__jinf.get_current_dir(), "%s.%04x.jdat" % (self.__jinf.get_jrnl_base_name(), i)) + fhandle = open(jfn) + fhdr = jrnl.Utils.load(fhandle, jrnl.Hdr) + if fhdr.empty(): + break + this_tup = (i, jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str()) + flist.append(this_tup) + if i == 0: + init_owi = fhdr.owi() + self.__oldest = this_tup + elif fhdr.owi() != init_owi and not owi_found: + self.__oldest = this_tup + owi_found = True + return flist + + +#== class JrnlReader ==================================================== + +class JrnlReader(object): + """ + This class contains an Enqueue Map (emap), a transaction map (tmap) and a transaction + object list (txn_obj_list) which are populated by reading the journals from the oldest + to the newest and analyzing each record. The JrnlInfo and JrnlAnalyzer + objects supplied on construction provide the information used for the recovery. + + The analysis is performed on construction. + """ + + def __init__(self, jinfo, jra, qflag = False, rflag = False, vflag = False): + """Constructor, which reads all """ + self._jinfo = jinfo + self._jra = jra + self._qflag = qflag + self._rflag = rflag + self._vflag = vflag + + # test callback functions for CSV tests + self._csv_store_chk = None + self._csv_start_cb = None + self._csv_enq_cb = None + self._csv_deq_cb = None + self._csv_txn_cb = None + self._csv_end_cb = None + + self._emap = EnqMap() + self._tmap = TxnMap(self._emap) + self._txn_obj_list = {} + + self._file = None + self._file_hdr = None + self._file_num = None + self._first_rec_flag = None + self._fro = None + self._last_file_flag = None + self._start_file_num = None + self._file_hdr_owi = None + self._warning = [] + + self._abort_cnt = 0 + self._commit_cnt = 0 + self._msg_cnt = 0 + self._rec_cnt = 0 + self._txn_msg_cnt = 0 + + def __str__(self): + """Print out all the undequeued records""" + return self.report(True, self._rflag) + + def emap(self): + """Get the enqueue map""" + return self._emap + + def get_abort_cnt(self): + """Get the cumulative number of transactional aborts found""" + return self._abort_cnt + + def get_commit_cnt(self): + """Get the cumulative number of transactional commits found""" + return self._commit_cnt + + def get_msg_cnt(self): + """Get the cumulative number of messages found""" + return self._msg_cnt + + def get_rec_cnt(self): + """Get the cumulative number of journal records (including fillers) found""" + return self._rec_cnt + + def is_last_file(self): + """Return True if the last file is being read""" + return self._last_file_flag + + def report(self, show_stats = True, show_records = False): + """Return a string containing a report on the file analysis""" + rstr = self._emap.report(show_stats, show_records) + "\n" + self._tmap.report(show_stats, show_records) + #TODO - print size analysis here - ie how full, sparse, est. space remaining before enq threshold + return rstr + + def run(self): + """Perform the read of the journal""" + if self._csv_start_cb != None and self._csv_start_cb(self._csv_store_chk): + return + if self._jra.is_empty(): + return + stop = self._advance_jrnl_file(*self._jra.get_oldest_file()) + while not stop and not self._get_next_record(): + pass + if self._csv_end_cb != None and self._csv_end_cb(self._csv_store_chk): + return + if not self._qflag: + print + + def set_callbacks(self, csv_store_chk, csv_start_cb = None, csv_enq_cb = None, csv_deq_cb = None, csv_txn_cb = None, + csv_end_cb = None): + """Set callbacks for checks to be made at various points while reading the journal""" + self._csv_store_chk = csv_store_chk + self._csv_start_cb = csv_start_cb + self._csv_enq_cb = csv_enq_cb + self._csv_deq_cb = csv_deq_cb + self._csv_txn_cb = csv_txn_cb + self._csv_end_cb = csv_end_cb + + def tmap(self): + """Return the transaction map""" + return self._tmap + + def get_txn_msg_cnt(self): + """Get the cumulative transactional message count""" + return self._txn_msg_cnt + + def txn_obj_list(self): + """Get a cumulative list of transaction objects (commits and aborts)""" + return self._txn_obj_list + + def _advance_jrnl_file(self, *oldest_file_info): + """Rotate to using the next journal file. Return False if the operation was successful, True if there are no + more files to read.""" + fro_seek_flag = False + if len(oldest_file_info) > 0: + self._start_file_num = self._file_num = oldest_file_info[0] + self._fro = oldest_file_info[4] + fro_seek_flag = True # jump to fro to start reading + if not self._qflag and not self._rflag: + if self._vflag: + print "Recovering journals..." + else: + print "Recovering journals", + if self._file != None and self._is_file_full(): + self._file.close() + self._file_num = self._incr_file_num() + if self._file_num == self._start_file_num: + return True + if self._start_file_num == 0: + self._last_file_flag = self._file_num == self._jinfo.get_num_jrnl_files() - 1 + else: + self._last_file_flag = self._file_num == self._start_file_num - 1 + if self._file_num < 0 or self._file_num >= self._jinfo.get_num_jrnl_files(): + raise jerr.BadFileNumberError(self._file_num) + jfn = os.path.join(self._jinfo.get_current_dir(), "%s.%04x.jdat" % + (self._jinfo.get_jrnl_base_name(), self._file_num)) + self._file = open(jfn) + self._file_hdr = jrnl.Utils.load(self._file, jrnl.Hdr) + if fro_seek_flag and self._file.tell() != self._fro: + self._file.seek(self._fro) + self._first_rec_flag = True + if not self._qflag: + if self._rflag: + print jfn, ": ", self._file_hdr + elif self._vflag: + print "* Reading %s" % jfn + else: + print ".", + sys.stdout.flush() + return False + + def _check_owi(self, hdr): + """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can + indicate whether the last record in a file has been read and now older records which have not yet been + overwritten are now being read.""" + return self._file_hdr_owi == hdr.owi() + + def _is_file_full(self): + """Return True if the current file is full (no more write space); false otherwise""" + return self._file.tell() >= self._jinfo.get_jrnl_file_size_bytes() + + def _get_next_record(self): + """Get the next record in the file for analysis""" + if self._is_file_full(): + if self._advance_jrnl_file(): + return True + try: + hdr = jrnl.Utils.load(self._file, jrnl.Hdr) + except: + return True + if hdr.empty(): + return True + if hdr.check(): + return True + self._rec_cnt += 1 + self._file_hdr_owi = self._file_hdr.owi() + if self._first_rec_flag: + if self._file_hdr.fro != hdr.foffs: + raise jerr.FirstRecordOffsetMismatch(self._file_hdr.fro, hdr.foffs) + else: + if self._rflag: + print " * fro ok: 0x%x" % self._file_hdr.fro + self._first_rec_flag = False + stop = False + if isinstance(hdr, jrnl.EnqRec): + stop = self._handle_enq_rec(hdr) + elif isinstance(hdr, jrnl.DeqRec): + stop = self._handle_deq_rec(hdr) + elif isinstance(hdr, jrnl.TxnRec): + stop = self._handle_txn_rec(hdr) + wstr = "" + for warn in self._warning: + wstr += " (%s)" % warn + if self._rflag: + print " > %s %s" % (hdr, wstr) + self._warning = [] + return stop + + def _handle_deq_rec(self, hdr): + """Process a dequeue ("RHMd") record""" + if self._load_rec(hdr): + return True + + # Check OWI flag + if not self._check_owi(hdr): + self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") + return True + # Test hook + if self._csv_deq_cb != None and self._csv_deq_cb(self._csv_store_chk, hdr): + return True + + try: + if hdr.xid == None: + self._emap.delete(hdr.deq_rid) + else: + self._tmap.add(self._file_hdr.fid, hdr) + except jerr.JWarning, warn: + self._warning.append(str(warn)) + return False + + def _handle_enq_rec(self, hdr): + """Process a dequeue ("RHMe") record""" + if self._load_rec(hdr): + return True + + # Check extern flag + if hdr.extern and hdr.data != None: + raise jerr.ExternFlagDataError(hdr) + # Check OWI flag + if not self._check_owi(hdr): + self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") + return True + # Test hook + if self._csv_enq_cb != None and self._csv_enq_cb(self._csv_store_chk, hdr): + return True + + if hdr.xid == None: + self._emap.add(self._file_hdr.fid, hdr) + else: + self._txn_msg_cnt += 1 + self._tmap.add(self._file_hdr.fid, hdr) + self._msg_cnt += 1 + return False + + def _handle_txn_rec(self, hdr): + """Process a transaction ("RHMa or RHMc") record""" + if self._load_rec(hdr): + return True + + # Check OWI flag + if not self._check_owi(hdr): + self._warning.append("WARNING: OWI mismatch - could be overwrite boundary.") + return True + # Test hook + if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr): + return True + + if hdr.magic[-1] == "a": + self._abort_cnt += 1 + else: + self._commit_cnt += 1 + + if self._tmap.contains(hdr.xid): + mismatched_rids = self._tmap.delete(hdr) + if mismatched_rids != None and len(mismatched_rids) > 0: + self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" % + mismatched_rids) + else: + self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid)) + if hdr.magic[-1] == "c": # commits only + self._txn_obj_list[hdr.xid] = hdr + return False + + def _incr_file_num(self): + """Increment the number of files read with wraparound (ie after file n-1, go to 0)""" + self._file_num += 1 + if self._file_num >= self._jinfo.get_num_jrnl_files(): + self._file_num = 0 + return self._file_num + + def _load_rec(self, hdr): + """Load a single record for the given header. There may be arbitrarily large xids and data components.""" + while not hdr.complete(): + if self._advance_jrnl_file(): + return True + hdr.load(self._file) + return False + +# ============================================================================= + +if __name__ == "__main__": + print "This is a library, and cannot be executed." |